9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
26 class MethodHolder(VppTestCase):
27 """ NAT create capture and verify method holder """
31 super(MethodHolder, cls).setUpClass()
34 super(MethodHolder, self).tearDown()
36 def check_ip_checksum(self, pkt):
38 Check IP checksum of the packet
40 :param pkt: Packet to check IP checksum
42 new = pkt.__class__(str(pkt))
44 new = new.__class__(str(new))
45 self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
47 def check_tcp_checksum(self, pkt):
49 Check TCP checksum in IP packet
51 :param pkt: Packet to check TCP checksum
53 new = pkt.__class__(str(pkt))
55 new = new.__class__(str(new))
56 self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
58 def check_udp_checksum(self, pkt):
60 Check UDP checksum in IP packet
62 :param pkt: Packet to check UDP checksum
64 new = pkt.__class__(str(pkt))
66 new = new.__class__(str(new))
67 self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
69 def check_icmp_errror_embedded(self, pkt):
71 Check ICMP error embeded packet checksum
73 :param pkt: Packet to check ICMP error embeded packet checksum
75 if pkt.haslayer(IPerror):
76 new = pkt.__class__(str(pkt))
77 del new['IPerror'].chksum
78 new = new.__class__(str(new))
79 self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
81 if pkt.haslayer(TCPerror):
82 new = pkt.__class__(str(pkt))
83 del new['TCPerror'].chksum
84 new = new.__class__(str(new))
85 self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
87 if pkt.haslayer(UDPerror):
88 if pkt['UDPerror'].chksum != 0:
89 new = pkt.__class__(str(pkt))
90 del new['UDPerror'].chksum
91 new = new.__class__(str(new))
92 self.assertEqual(new['UDPerror'].chksum,
93 pkt['UDPerror'].chksum)
95 if pkt.haslayer(ICMPerror):
96 del new['ICMPerror'].chksum
97 new = new.__class__(str(new))
98 self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
100 def check_icmp_checksum(self, pkt):
102 Check ICMP checksum in IPv4 packet
104 :param pkt: Packet to check ICMP checksum
106 new = pkt.__class__(str(pkt))
107 del new['ICMP'].chksum
108 new = new.__class__(str(new))
109 self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
110 if pkt.haslayer(IPerror):
111 self.check_icmp_errror_embedded(pkt)
113 def check_icmpv6_checksum(self, pkt):
115 Check ICMPv6 checksum in IPv4 packet
117 :param pkt: Packet to check ICMPv6 checksum
119 new = pkt.__class__(str(pkt))
120 if pkt.haslayer(ICMPv6DestUnreach):
121 del new['ICMPv6DestUnreach'].cksum
122 new = new.__class__(str(new))
123 self.assertEqual(new['ICMPv6DestUnreach'].cksum,
124 pkt['ICMPv6DestUnreach'].cksum)
125 self.check_icmp_errror_embedded(pkt)
126 if pkt.haslayer(ICMPv6EchoRequest):
127 del new['ICMPv6EchoRequest'].cksum
128 new = new.__class__(str(new))
129 self.assertEqual(new['ICMPv6EchoRequest'].cksum,
130 pkt['ICMPv6EchoRequest'].cksum)
131 if pkt.haslayer(ICMPv6EchoReply):
132 del new['ICMPv6EchoReply'].cksum
133 new = new.__class__(str(new))
134 self.assertEqual(new['ICMPv6EchoReply'].cksum,
135 pkt['ICMPv6EchoReply'].cksum)
137 def create_stream_in(self, in_if, out_if, ttl=64):
139 Create packet stream for inside network
141 :param in_if: Inside interface
142 :param out_if: Outside interface
143 :param ttl: TTL of generated packets
147 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
148 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
149 TCP(sport=self.tcp_port_in, dport=20))
153 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
154 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
155 UDP(sport=self.udp_port_in, dport=20))
159 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
160 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
161 ICMP(id=self.icmp_id_in, type='echo-request'))
166 def compose_ip6(self, ip4, pref, plen):
168 Compose IPv4-embedded IPv6 addresses
170 :param ip4: IPv4 address
171 :param pref: IPv6 prefix
172 :param plen: IPv6 prefix length
173 :returns: IPv4-embedded IPv6 addresses
175 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
176 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
191 pref_n[10] = ip4_n[3]
195 pref_n[10] = ip4_n[2]
196 pref_n[11] = ip4_n[3]
199 pref_n[10] = ip4_n[1]
200 pref_n[11] = ip4_n[2]
201 pref_n[12] = ip4_n[3]
203 pref_n[12] = ip4_n[0]
204 pref_n[13] = ip4_n[1]
205 pref_n[14] = ip4_n[2]
206 pref_n[15] = ip4_n[3]
207 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
209 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
211 Create IPv6 packet stream for inside network
213 :param in_if: Inside interface
214 :param out_if: Outside interface
215 :param ttl: Hop Limit of generated packets
216 :param pref: NAT64 prefix
217 :param plen: NAT64 prefix length
221 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
223 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
226 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
227 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
228 TCP(sport=self.tcp_port_in, dport=20))
232 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
233 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
234 UDP(sport=self.udp_port_in, dport=20))
238 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
239 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
240 ICMPv6EchoRequest(id=self.icmp_id_in))
245 def create_stream_out(self, out_if, dst_ip=None, ttl=64,
246 use_inside_ports=False):
248 Create packet stream for outside network
250 :param out_if: Outside interface
251 :param dst_ip: Destination IP address (Default use global NAT address)
252 :param ttl: TTL of generated packets
253 :param use_inside_ports: Use inside NAT ports as destination ports
254 instead of outside ports
257 dst_ip = self.nat_addr
258 if not use_inside_ports:
259 tcp_port = self.tcp_port_out
260 udp_port = self.udp_port_out
261 icmp_id = self.icmp_id_out
263 tcp_port = self.tcp_port_in
264 udp_port = self.udp_port_in
265 icmp_id = self.icmp_id_in
268 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
269 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
270 TCP(dport=tcp_port, sport=20))
274 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
275 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
276 UDP(dport=udp_port, sport=20))
280 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
281 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
282 ICMP(id=icmp_id, type='echo-reply'))
287 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
288 packet_num=3, dst_ip=None):
290 Verify captured packets on outside network
292 :param capture: Captured packets
293 :param nat_ip: Translated IP address (Default use global NAT address)
294 :param same_port: Sorce port number is not translated (Default False)
295 :param packet_num: Expected number of packets (Default 3)
296 :param dst_ip: Destination IP address (Default do not verify)
299 nat_ip = self.nat_addr
300 self.assertEqual(packet_num, len(capture))
301 for packet in capture:
303 self.check_ip_checksum(packet)
304 self.assertEqual(packet[IP].src, nat_ip)
305 if dst_ip is not None:
306 self.assertEqual(packet[IP].dst, dst_ip)
307 if packet.haslayer(TCP):
309 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
312 packet[TCP].sport, self.tcp_port_in)
313 self.tcp_port_out = packet[TCP].sport
314 self.check_tcp_checksum(packet)
315 elif packet.haslayer(UDP):
317 self.assertEqual(packet[UDP].sport, self.udp_port_in)
320 packet[UDP].sport, self.udp_port_in)
321 self.udp_port_out = packet[UDP].sport
324 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
326 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
327 self.icmp_id_out = packet[ICMP].id
328 self.check_icmp_checksum(packet)
330 self.logger.error(ppp("Unexpected or invalid packet "
331 "(outside network):", packet))
334 def verify_capture_in(self, capture, in_if, packet_num=3):
336 Verify captured packets on inside network
338 :param capture: Captured packets
339 :param in_if: Inside interface
340 :param packet_num: Expected number of packets (Default 3)
342 self.assertEqual(packet_num, len(capture))
343 for packet in capture:
345 self.check_ip_checksum(packet)
346 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
347 if packet.haslayer(TCP):
348 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
349 self.check_tcp_checksum(packet)
350 elif packet.haslayer(UDP):
351 self.assertEqual(packet[UDP].dport, self.udp_port_in)
353 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
354 self.check_icmp_checksum(packet)
356 self.logger.error(ppp("Unexpected or invalid packet "
357 "(inside network):", packet))
360 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
362 Verify captured IPv6 packets on inside network
364 :param capture: Captured packets
365 :param src_ip: Source IP
366 :param dst_ip: Destination IP address
367 :param packet_num: Expected number of packets (Default 3)
369 self.assertEqual(packet_num, len(capture))
370 for packet in capture:
372 self.assertEqual(packet[IPv6].src, src_ip)
373 self.assertEqual(packet[IPv6].dst, dst_ip)
374 if packet.haslayer(TCP):
375 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
376 self.check_tcp_checksum(packet)
377 elif packet.haslayer(UDP):
378 self.assertEqual(packet[UDP].dport, self.udp_port_in)
379 self.check_udp_checksum(packet)
381 self.assertEqual(packet[ICMPv6EchoReply].id,
383 self.check_icmpv6_checksum(packet)
385 self.logger.error(ppp("Unexpected or invalid packet "
386 "(inside network):", packet))
389 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
391 Verify captured packet that don't have to be translated
393 :param capture: Captured packets
394 :param ingress_if: Ingress interface
395 :param egress_if: Egress interface
397 for packet in capture:
399 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
400 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
401 if packet.haslayer(TCP):
402 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
403 elif packet.haslayer(UDP):
404 self.assertEqual(packet[UDP].sport, self.udp_port_in)
406 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
408 self.logger.error(ppp("Unexpected or invalid packet "
409 "(inside network):", packet))
412 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
413 packet_num=3, icmp_type=11):
415 Verify captured packets with ICMP errors on outside network
417 :param capture: Captured packets
418 :param src_ip: Translated IP address or IP address of VPP
419 (Default use global NAT address)
420 :param packet_num: Expected number of packets (Default 3)
421 :param icmp_type: Type of error ICMP packet
422 we are expecting (Default 11)
425 src_ip = self.nat_addr
426 self.assertEqual(packet_num, len(capture))
427 for packet in capture:
429 self.assertEqual(packet[IP].src, src_ip)
430 self.assertTrue(packet.haslayer(ICMP))
432 self.assertEqual(icmp.type, icmp_type)
433 self.assertTrue(icmp.haslayer(IPerror))
434 inner_ip = icmp[IPerror]
435 if inner_ip.haslayer(TCPerror):
436 self.assertEqual(inner_ip[TCPerror].dport,
438 elif inner_ip.haslayer(UDPerror):
439 self.assertEqual(inner_ip[UDPerror].dport,
442 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
444 self.logger.error(ppp("Unexpected or invalid packet "
445 "(outside network):", packet))
448 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
451 Verify captured packets with ICMP errors on inside network
453 :param capture: Captured packets
454 :param in_if: Inside interface
455 :param packet_num: Expected number of packets (Default 3)
456 :param icmp_type: Type of error ICMP packet
457 we are expecting (Default 11)
459 self.assertEqual(packet_num, len(capture))
460 for packet in capture:
462 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
463 self.assertTrue(packet.haslayer(ICMP))
465 self.assertEqual(icmp.type, icmp_type)
466 self.assertTrue(icmp.haslayer(IPerror))
467 inner_ip = icmp[IPerror]
468 if inner_ip.haslayer(TCPerror):
469 self.assertEqual(inner_ip[TCPerror].sport,
471 elif inner_ip.haslayer(UDPerror):
472 self.assertEqual(inner_ip[UDPerror].sport,
475 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
477 self.logger.error(ppp("Unexpected or invalid packet "
478 "(inside network):", packet))
481 def create_stream_frag(self, src_if, dst, sport, dport, data):
483 Create fragmented packet stream
485 :param src_if: Source interface
486 :param dst: Destination IPv4 address
487 :param sport: Source TCP port
488 :param dport: Destination TCP port
489 :param data: Payload data
492 id = random.randint(0, 65535)
493 p = (IP(src=src_if.remote_ip4, dst=dst) /
494 TCP(sport=sport, dport=dport) /
496 p = p.__class__(str(p))
497 chksum = p['TCP'].chksum
499 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
500 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
501 TCP(sport=sport, dport=dport, chksum=chksum) /
504 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
505 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
506 proto=IP_PROTOS.tcp) /
509 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
510 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
516 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
517 pref=None, plen=0, frag_size=128):
519 Create fragmented packet stream
521 :param src_if: Source interface
522 :param dst: Destination IPv4 address
523 :param sport: Source TCP port
524 :param dport: Destination TCP port
525 :param data: Payload data
526 :param pref: NAT64 prefix
527 :param plen: NAT64 prefix length
528 :param fragsize: size of fragments
532 dst_ip6 = ''.join(['64:ff9b::', dst])
534 dst_ip6 = self.compose_ip6(dst, pref, plen)
536 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
537 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
538 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
539 TCP(sport=sport, dport=dport) /
542 return fragment6(p, frag_size)
544 def reass_frags_and_verify(self, frags, src, dst):
546 Reassemble and verify fragmented packet
548 :param frags: Captured fragments
549 :param src: Source IPv4 address to verify
550 :param dst: Destination IPv4 address to verify
552 :returns: Reassembled IPv4 packet
554 buffer = StringIO.StringIO()
556 self.assertEqual(p[IP].src, src)
557 self.assertEqual(p[IP].dst, dst)
558 self.check_ip_checksum(p)
559 buffer.seek(p[IP].frag * 8)
560 buffer.write(p[IP].payload)
561 ip = frags[0].getlayer(IP)
562 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
563 proto=frags[0][IP].proto)
564 if ip.proto == IP_PROTOS.tcp:
565 p = (ip / TCP(buffer.getvalue()))
566 self.check_tcp_checksum(p)
567 elif ip.proto == IP_PROTOS.udp:
568 p = (ip / UDP(buffer.getvalue()))
571 def reass_frags_and_verify_ip6(self, frags, src, dst):
573 Reassemble and verify fragmented packet
575 :param frags: Captured fragments
576 :param src: Source IPv6 address to verify
577 :param dst: Destination IPv6 address to verify
579 :returns: Reassembled IPv6 packet
581 buffer = StringIO.StringIO()
583 self.assertEqual(p[IPv6].src, src)
584 self.assertEqual(p[IPv6].dst, dst)
585 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
586 buffer.write(p[IPv6ExtHdrFragment].payload)
587 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
588 nh=frags[0][IPv6ExtHdrFragment].nh)
589 if ip.nh == IP_PROTOS.tcp:
590 p = (ip / TCP(buffer.getvalue()))
591 self.check_tcp_checksum(p)
592 elif ip.nh == IP_PROTOS.udp:
593 p = (ip / UDP(buffer.getvalue()))
596 def verify_ipfix_nat44_ses(self, data):
598 Verify IPFIX NAT44 session create/delete event
600 :param data: Decoded IPFIX data records
602 nat44_ses_create_num = 0
603 nat44_ses_delete_num = 0
604 self.assertEqual(6, len(data))
607 self.assertIn(ord(record[230]), [4, 5])
608 if ord(record[230]) == 4:
609 nat44_ses_create_num += 1
611 nat44_ses_delete_num += 1
613 self.assertEqual(self.pg0.remote_ip4n, record[8])
614 # postNATSourceIPv4Address
615 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
618 self.assertEqual(struct.pack("!I", 0), record[234])
619 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
620 if IP_PROTOS.icmp == ord(record[4]):
621 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
622 self.assertEqual(struct.pack("!H", self.icmp_id_out),
624 elif IP_PROTOS.tcp == ord(record[4]):
625 self.assertEqual(struct.pack("!H", self.tcp_port_in),
627 self.assertEqual(struct.pack("!H", self.tcp_port_out),
629 elif IP_PROTOS.udp == ord(record[4]):
630 self.assertEqual(struct.pack("!H", self.udp_port_in),
632 self.assertEqual(struct.pack("!H", self.udp_port_out),
635 self.fail("Invalid protocol")
636 self.assertEqual(3, nat44_ses_create_num)
637 self.assertEqual(3, nat44_ses_delete_num)
639 def verify_ipfix_addr_exhausted(self, data):
641 Verify IPFIX NAT addresses event
643 :param data: Decoded IPFIX data records
645 self.assertEqual(1, len(data))
648 self.assertEqual(ord(record[230]), 3)
650 self.assertEqual(struct.pack("!I", 0), record[283])
653 class TestNAT44(MethodHolder):
654 """ NAT44 Test Cases """
658 super(TestNAT44, cls).setUpClass()
661 cls.tcp_port_in = 6303
662 cls.tcp_port_out = 6303
663 cls.udp_port_in = 6304
664 cls.udp_port_out = 6304
665 cls.icmp_id_in = 6305
666 cls.icmp_id_out = 6305
667 cls.nat_addr = '10.0.0.3'
668 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
669 cls.ipfix_src_port = 4739
670 cls.ipfix_domain_id = 1
672 cls.create_pg_interfaces(range(10))
673 cls.interfaces = list(cls.pg_interfaces[0:4])
675 for i in cls.interfaces:
680 cls.pg0.generate_remote_hosts(3)
681 cls.pg0.configure_ipv4_neighbors()
683 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
684 cls.vapi.ip_table_add_del(10, is_add=1)
685 cls.vapi.ip_table_add_del(20, is_add=1)
687 cls.pg4._local_ip4 = "172.16.255.1"
688 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
689 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
690 cls.pg4.set_table_ip4(10)
691 cls.pg5._local_ip4 = "172.17.255.3"
692 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
693 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
694 cls.pg5.set_table_ip4(10)
695 cls.pg6._local_ip4 = "172.16.255.1"
696 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
697 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
698 cls.pg6.set_table_ip4(20)
699 for i in cls.overlapping_interfaces:
707 cls.pg9.generate_remote_hosts(2)
709 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
710 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
714 cls.pg9.resolve_arp()
715 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
716 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
717 cls.pg9.resolve_arp()
720 super(TestNAT44, cls).tearDownClass()
723 def clear_nat44(self):
725 Clear NAT44 configuration.
727 # I found no elegant way to do this
728 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
729 dst_address_length=32,
730 next_hop_address=self.pg7.remote_ip4n,
731 next_hop_sw_if_index=self.pg7.sw_if_index,
733 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
734 dst_address_length=32,
735 next_hop_address=self.pg8.remote_ip4n,
736 next_hop_sw_if_index=self.pg8.sw_if_index,
739 for intf in [self.pg7, self.pg8]:
740 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
742 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
747 if self.pg7.has_ip4_config:
748 self.pg7.unconfig_ip4()
750 interfaces = self.vapi.nat44_interface_addr_dump()
751 for intf in interfaces:
752 self.vapi.nat44_add_interface_addr(intf.sw_if_index,
753 twice_nat=intf.twice_nat,
756 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
757 domain_id=self.ipfix_domain_id)
758 self.ipfix_src_port = 4739
759 self.ipfix_domain_id = 1
761 interfaces = self.vapi.nat44_interface_dump()
762 for intf in interfaces:
763 if intf.is_inside > 1:
764 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
767 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
771 interfaces = self.vapi.nat44_interface_output_feature_dump()
772 for intf in interfaces:
773 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
777 static_mappings = self.vapi.nat44_static_mapping_dump()
778 for sm in static_mappings:
779 self.vapi.nat44_add_del_static_mapping(
781 sm.external_ip_address,
782 local_port=sm.local_port,
783 external_port=sm.external_port,
784 addr_only=sm.addr_only,
786 protocol=sm.protocol,
787 twice_nat=sm.twice_nat,
790 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
791 for lb_sm in lb_static_mappings:
792 self.vapi.nat44_add_del_lb_static_mapping(
797 twice_nat=lb_sm.twice_nat,
802 identity_mappings = self.vapi.nat44_identity_mapping_dump()
803 for id_m in identity_mappings:
804 self.vapi.nat44_add_del_identity_mapping(
805 addr_only=id_m.addr_only,
808 sw_if_index=id_m.sw_if_index,
810 protocol=id_m.protocol,
813 adresses = self.vapi.nat44_address_dump()
814 for addr in adresses:
815 self.vapi.nat44_add_del_address_range(addr.ip_address,
817 twice_nat=addr.twice_nat,
820 self.vapi.nat_set_reass()
821 self.vapi.nat_set_reass(is_ip6=1)
823 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
824 local_port=0, external_port=0, vrf_id=0,
825 is_add=1, external_sw_if_index=0xFFFFFFFF,
826 proto=0, twice_nat=0):
828 Add/delete NAT44 static mapping
830 :param local_ip: Local IP address
831 :param external_ip: External IP address
832 :param local_port: Local port number (Optional)
833 :param external_port: External port number (Optional)
834 :param vrf_id: VRF ID (Default 0)
835 :param is_add: 1 if add, 0 if delete (Default add)
836 :param external_sw_if_index: External interface instead of IP address
837 :param proto: IP protocol (Mandatory if port specified)
838 :param twice_nat: 1 if translate external host address and port
841 if local_port and external_port:
843 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
844 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
845 self.vapi.nat44_add_del_static_mapping(
848 external_sw_if_index,
857 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
859 Add/delete NAT44 address
861 :param ip: IP address
862 :param is_add: 1 if add, 0 if delete (Default add)
863 :param twice_nat: twice NAT address for extenal hosts
865 nat_addr = socket.inet_pton(socket.AF_INET, ip)
866 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
870 def test_dynamic(self):
871 """ NAT44 dynamic translation test """
873 self.nat44_add_address(self.nat_addr)
874 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
875 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
879 pkts = self.create_stream_in(self.pg0, self.pg1)
880 self.pg0.add_stream(pkts)
881 self.pg_enable_capture(self.pg_interfaces)
883 capture = self.pg1.get_capture(len(pkts))
884 self.verify_capture_out(capture)
887 pkts = self.create_stream_out(self.pg1)
888 self.pg1.add_stream(pkts)
889 self.pg_enable_capture(self.pg_interfaces)
891 capture = self.pg0.get_capture(len(pkts))
892 self.verify_capture_in(capture, self.pg0)
894 def test_dynamic_icmp_errors_in2out_ttl_1(self):
895 """ NAT44 handling of client packets with TTL=1 """
897 self.nat44_add_address(self.nat_addr)
898 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
899 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
902 # Client side - generate traffic
903 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
904 self.pg0.add_stream(pkts)
905 self.pg_enable_capture(self.pg_interfaces)
908 # Client side - verify ICMP type 11 packets
909 capture = self.pg0.get_capture(len(pkts))
910 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
912 def test_dynamic_icmp_errors_out2in_ttl_1(self):
913 """ NAT44 handling of server packets with TTL=1 """
915 self.nat44_add_address(self.nat_addr)
916 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
917 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
920 # Client side - create sessions
921 pkts = self.create_stream_in(self.pg0, self.pg1)
922 self.pg0.add_stream(pkts)
923 self.pg_enable_capture(self.pg_interfaces)
926 # Server side - generate traffic
927 capture = self.pg1.get_capture(len(pkts))
928 self.verify_capture_out(capture)
929 pkts = self.create_stream_out(self.pg1, ttl=1)
930 self.pg1.add_stream(pkts)
931 self.pg_enable_capture(self.pg_interfaces)
934 # Server side - verify ICMP type 11 packets
935 capture = self.pg1.get_capture(len(pkts))
936 self.verify_capture_out_with_icmp_errors(capture,
937 src_ip=self.pg1.local_ip4)
939 def test_dynamic_icmp_errors_in2out_ttl_2(self):
940 """ NAT44 handling of error responses to client packets with TTL=2 """
942 self.nat44_add_address(self.nat_addr)
943 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
944 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
947 # Client side - generate traffic
948 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
949 self.pg0.add_stream(pkts)
950 self.pg_enable_capture(self.pg_interfaces)
953 # Server side - simulate ICMP type 11 response
954 capture = self.pg1.get_capture(len(pkts))
955 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
956 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
957 ICMP(type=11) / packet[IP] for packet in capture]
958 self.pg1.add_stream(pkts)
959 self.pg_enable_capture(self.pg_interfaces)
962 # Client side - verify ICMP type 11 packets
963 capture = self.pg0.get_capture(len(pkts))
964 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
966 def test_dynamic_icmp_errors_out2in_ttl_2(self):
967 """ NAT44 handling of error responses to server packets with TTL=2 """
969 self.nat44_add_address(self.nat_addr)
970 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
971 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
974 # Client side - create sessions
975 pkts = self.create_stream_in(self.pg0, self.pg1)
976 self.pg0.add_stream(pkts)
977 self.pg_enable_capture(self.pg_interfaces)
980 # Server side - generate traffic
981 capture = self.pg1.get_capture(len(pkts))
982 self.verify_capture_out(capture)
983 pkts = self.create_stream_out(self.pg1, ttl=2)
984 self.pg1.add_stream(pkts)
985 self.pg_enable_capture(self.pg_interfaces)
988 # Client side - simulate ICMP type 11 response
989 capture = self.pg0.get_capture(len(pkts))
990 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
991 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
992 ICMP(type=11) / packet[IP] for packet in capture]
993 self.pg0.add_stream(pkts)
994 self.pg_enable_capture(self.pg_interfaces)
997 # Server side - verify ICMP type 11 packets
998 capture = self.pg1.get_capture(len(pkts))
999 self.verify_capture_out_with_icmp_errors(capture)
1001 def test_ping_out_interface_from_outside(self):
1002 """ Ping NAT44 out interface from outside network """
1004 self.nat44_add_address(self.nat_addr)
1005 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1006 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1009 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1010 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1011 ICMP(id=self.icmp_id_out, type='echo-request'))
1013 self.pg1.add_stream(pkts)
1014 self.pg_enable_capture(self.pg_interfaces)
1016 capture = self.pg1.get_capture(len(pkts))
1017 self.assertEqual(1, len(capture))
1020 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1021 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1022 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1023 self.assertEqual(packet[ICMP].type, 0) # echo reply
1025 self.logger.error(ppp("Unexpected or invalid packet "
1026 "(outside network):", packet))
1029 def test_ping_internal_host_from_outside(self):
1030 """ Ping internal host from outside network """
1032 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1033 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1034 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1038 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1039 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1040 ICMP(id=self.icmp_id_out, type='echo-request'))
1041 self.pg1.add_stream(pkt)
1042 self.pg_enable_capture(self.pg_interfaces)
1044 capture = self.pg0.get_capture(1)
1045 self.verify_capture_in(capture, self.pg0, packet_num=1)
1046 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1049 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1050 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1051 ICMP(id=self.icmp_id_in, type='echo-reply'))
1052 self.pg0.add_stream(pkt)
1053 self.pg_enable_capture(self.pg_interfaces)
1055 capture = self.pg1.get_capture(1)
1056 self.verify_capture_out(capture, same_port=True, packet_num=1)
1057 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1059 def test_forwarding(self):
1060 """ NAT44 forwarding test """
1062 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1063 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1065 self.vapi.nat44_forwarding_enable_disable(1)
1067 real_ip = self.pg0.remote_ip4n
1068 alias_ip = self.nat_addr_n
1069 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1070 external_ip=alias_ip)
1073 # in2out - static mapping match
1075 pkts = self.create_stream_out(self.pg1)
1076 self.pg1.add_stream(pkts)
1077 self.pg_enable_capture(self.pg_interfaces)
1079 capture = self.pg0.get_capture(len(pkts))
1080 self.verify_capture_in(capture, self.pg0)
1082 pkts = self.create_stream_in(self.pg0, self.pg1)
1083 self.pg0.add_stream(pkts)
1084 self.pg_enable_capture(self.pg_interfaces)
1086 capture = self.pg1.get_capture(len(pkts))
1087 self.verify_capture_out(capture, same_port=True)
1089 # in2out - no static mapping match
1091 host0 = self.pg0.remote_hosts[0]
1092 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1094 pkts = self.create_stream_out(self.pg1,
1095 dst_ip=self.pg0.remote_ip4,
1096 use_inside_ports=True)
1097 self.pg1.add_stream(pkts)
1098 self.pg_enable_capture(self.pg_interfaces)
1100 capture = self.pg0.get_capture(len(pkts))
1101 self.verify_capture_in(capture, self.pg0)
1103 pkts = self.create_stream_in(self.pg0, self.pg1)
1104 self.pg0.add_stream(pkts)
1105 self.pg_enable_capture(self.pg_interfaces)
1107 capture = self.pg1.get_capture(len(pkts))
1108 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1111 self.pg0.remote_hosts[0] = host0
1114 self.vapi.nat44_forwarding_enable_disable(0)
1115 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1116 external_ip=alias_ip,
1119 def test_static_in(self):
1120 """ 1:1 NAT initialized from inside network """
1122 nat_ip = "10.0.0.10"
1123 self.tcp_port_out = 6303
1124 self.udp_port_out = 6304
1125 self.icmp_id_out = 6305
1127 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1128 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1129 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1133 pkts = self.create_stream_in(self.pg0, self.pg1)
1134 self.pg0.add_stream(pkts)
1135 self.pg_enable_capture(self.pg_interfaces)
1137 capture = self.pg1.get_capture(len(pkts))
1138 self.verify_capture_out(capture, nat_ip, True)
1141 pkts = self.create_stream_out(self.pg1, nat_ip)
1142 self.pg1.add_stream(pkts)
1143 self.pg_enable_capture(self.pg_interfaces)
1145 capture = self.pg0.get_capture(len(pkts))
1146 self.verify_capture_in(capture, self.pg0)
1148 def test_static_out(self):
1149 """ 1:1 NAT initialized from outside network """
1151 nat_ip = "10.0.0.20"
1152 self.tcp_port_out = 6303
1153 self.udp_port_out = 6304
1154 self.icmp_id_out = 6305
1156 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1157 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1158 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1162 pkts = self.create_stream_out(self.pg1, nat_ip)
1163 self.pg1.add_stream(pkts)
1164 self.pg_enable_capture(self.pg_interfaces)
1166 capture = self.pg0.get_capture(len(pkts))
1167 self.verify_capture_in(capture, self.pg0)
1170 pkts = self.create_stream_in(self.pg0, self.pg1)
1171 self.pg0.add_stream(pkts)
1172 self.pg_enable_capture(self.pg_interfaces)
1174 capture = self.pg1.get_capture(len(pkts))
1175 self.verify_capture_out(capture, nat_ip, True)
1177 def test_static_with_port_in(self):
1178 """ 1:1 NAPT initialized from inside network """
1180 self.tcp_port_out = 3606
1181 self.udp_port_out = 3607
1182 self.icmp_id_out = 3608
1184 self.nat44_add_address(self.nat_addr)
1185 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1186 self.tcp_port_in, self.tcp_port_out,
1187 proto=IP_PROTOS.tcp)
1188 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1189 self.udp_port_in, self.udp_port_out,
1190 proto=IP_PROTOS.udp)
1191 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1192 self.icmp_id_in, self.icmp_id_out,
1193 proto=IP_PROTOS.icmp)
1194 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1195 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1199 pkts = self.create_stream_in(self.pg0, self.pg1)
1200 self.pg0.add_stream(pkts)
1201 self.pg_enable_capture(self.pg_interfaces)
1203 capture = self.pg1.get_capture(len(pkts))
1204 self.verify_capture_out(capture)
1207 pkts = self.create_stream_out(self.pg1)
1208 self.pg1.add_stream(pkts)
1209 self.pg_enable_capture(self.pg_interfaces)
1211 capture = self.pg0.get_capture(len(pkts))
1212 self.verify_capture_in(capture, self.pg0)
1214 def test_static_with_port_out(self):
1215 """ 1:1 NAPT initialized from outside network """
1217 self.tcp_port_out = 30606
1218 self.udp_port_out = 30607
1219 self.icmp_id_out = 30608
1221 self.nat44_add_address(self.nat_addr)
1222 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1223 self.tcp_port_in, self.tcp_port_out,
1224 proto=IP_PROTOS.tcp)
1225 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1226 self.udp_port_in, self.udp_port_out,
1227 proto=IP_PROTOS.udp)
1228 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1229 self.icmp_id_in, self.icmp_id_out,
1230 proto=IP_PROTOS.icmp)
1231 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1232 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1236 pkts = self.create_stream_out(self.pg1)
1237 self.pg1.add_stream(pkts)
1238 self.pg_enable_capture(self.pg_interfaces)
1240 capture = self.pg0.get_capture(len(pkts))
1241 self.verify_capture_in(capture, self.pg0)
1244 pkts = self.create_stream_in(self.pg0, self.pg1)
1245 self.pg0.add_stream(pkts)
1246 self.pg_enable_capture(self.pg_interfaces)
1248 capture = self.pg1.get_capture(len(pkts))
1249 self.verify_capture_out(capture)
1251 def test_static_vrf_aware(self):
1252 """ 1:1 NAT VRF awareness """
1254 nat_ip1 = "10.0.0.30"
1255 nat_ip2 = "10.0.0.40"
1256 self.tcp_port_out = 6303
1257 self.udp_port_out = 6304
1258 self.icmp_id_out = 6305
1260 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1262 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1264 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1266 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1267 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1269 # inside interface VRF match NAT44 static mapping VRF
1270 pkts = self.create_stream_in(self.pg4, self.pg3)
1271 self.pg4.add_stream(pkts)
1272 self.pg_enable_capture(self.pg_interfaces)
1274 capture = self.pg3.get_capture(len(pkts))
1275 self.verify_capture_out(capture, nat_ip1, True)
1277 # inside interface VRF don't match NAT44 static mapping VRF (packets
1279 pkts = self.create_stream_in(self.pg0, self.pg3)
1280 self.pg0.add_stream(pkts)
1281 self.pg_enable_capture(self.pg_interfaces)
1283 self.pg3.assert_nothing_captured()
1285 def test_identity_nat(self):
1286 """ Identity NAT """
1288 self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1289 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1290 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1293 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1294 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1295 TCP(sport=12345, dport=56789))
1296 self.pg1.add_stream(p)
1297 self.pg_enable_capture(self.pg_interfaces)
1299 capture = self.pg0.get_capture(1)
1304 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1305 self.assertEqual(ip.src, self.pg1.remote_ip4)
1306 self.assertEqual(tcp.dport, 56789)
1307 self.assertEqual(tcp.sport, 12345)
1308 self.check_tcp_checksum(p)
1309 self.check_ip_checksum(p)
1311 self.logger.error(ppp("Unexpected or invalid packet:", p))
1314 def test_static_lb(self):
1315 """ NAT44 local service load balancing """
1316 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1319 server1 = self.pg0.remote_hosts[0]
1320 server2 = self.pg0.remote_hosts[1]
1322 locals = [{'addr': server1.ip4n,
1325 {'addr': server2.ip4n,
1329 self.nat44_add_address(self.nat_addr)
1330 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1333 local_num=len(locals),
1335 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1336 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1339 # from client to service
1340 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1341 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1342 TCP(sport=12345, dport=external_port))
1343 self.pg1.add_stream(p)
1344 self.pg_enable_capture(self.pg_interfaces)
1346 capture = self.pg0.get_capture(1)
1352 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1353 if ip.dst == server1.ip4:
1357 self.assertEqual(tcp.dport, local_port)
1358 self.check_tcp_checksum(p)
1359 self.check_ip_checksum(p)
1361 self.logger.error(ppp("Unexpected or invalid packet:", p))
1364 # from service back to client
1365 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1366 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1367 TCP(sport=local_port, dport=12345))
1368 self.pg0.add_stream(p)
1369 self.pg_enable_capture(self.pg_interfaces)
1371 capture = self.pg1.get_capture(1)
1376 self.assertEqual(ip.src, self.nat_addr)
1377 self.assertEqual(tcp.sport, external_port)
1378 self.check_tcp_checksum(p)
1379 self.check_ip_checksum(p)
1381 self.logger.error(ppp("Unexpected or invalid packet:", p))
1387 clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1389 for client in clients:
1390 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1391 IP(src=client, dst=self.nat_addr) /
1392 TCP(sport=12345, dport=external_port))
1394 self.pg1.add_stream(pkts)
1395 self.pg_enable_capture(self.pg_interfaces)
1397 capture = self.pg0.get_capture(len(pkts))
1399 if p[IP].dst == server1.ip4:
1403 self.assertTrue(server1_n > server2_n)
1405 def test_multiple_inside_interfaces(self):
1406 """ NAT44 multiple non-overlapping address space inside interfaces """
1408 self.nat44_add_address(self.nat_addr)
1409 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1410 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1411 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1414 # between two NAT44 inside interfaces (no translation)
1415 pkts = self.create_stream_in(self.pg0, self.pg1)
1416 self.pg0.add_stream(pkts)
1417 self.pg_enable_capture(self.pg_interfaces)
1419 capture = self.pg1.get_capture(len(pkts))
1420 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1422 # from NAT44 inside to interface without NAT44 feature (no translation)
1423 pkts = self.create_stream_in(self.pg0, self.pg2)
1424 self.pg0.add_stream(pkts)
1425 self.pg_enable_capture(self.pg_interfaces)
1427 capture = self.pg2.get_capture(len(pkts))
1428 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1430 # in2out 1st interface
1431 pkts = self.create_stream_in(self.pg0, self.pg3)
1432 self.pg0.add_stream(pkts)
1433 self.pg_enable_capture(self.pg_interfaces)
1435 capture = self.pg3.get_capture(len(pkts))
1436 self.verify_capture_out(capture)
1438 # out2in 1st interface
1439 pkts = self.create_stream_out(self.pg3)
1440 self.pg3.add_stream(pkts)
1441 self.pg_enable_capture(self.pg_interfaces)
1443 capture = self.pg0.get_capture(len(pkts))
1444 self.verify_capture_in(capture, self.pg0)
1446 # in2out 2nd interface
1447 pkts = self.create_stream_in(self.pg1, self.pg3)
1448 self.pg1.add_stream(pkts)
1449 self.pg_enable_capture(self.pg_interfaces)
1451 capture = self.pg3.get_capture(len(pkts))
1452 self.verify_capture_out(capture)
1454 # out2in 2nd interface
1455 pkts = self.create_stream_out(self.pg3)
1456 self.pg3.add_stream(pkts)
1457 self.pg_enable_capture(self.pg_interfaces)
1459 capture = self.pg1.get_capture(len(pkts))
1460 self.verify_capture_in(capture, self.pg1)
1462 def test_inside_overlapping_interfaces(self):
1463 """ NAT44 multiple inside interfaces with overlapping address space """
1465 static_nat_ip = "10.0.0.10"
1466 self.nat44_add_address(self.nat_addr)
1467 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1469 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1470 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1471 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1472 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1475 # between NAT44 inside interfaces with same VRF (no translation)
1476 pkts = self.create_stream_in(self.pg4, self.pg5)
1477 self.pg4.add_stream(pkts)
1478 self.pg_enable_capture(self.pg_interfaces)
1480 capture = self.pg5.get_capture(len(pkts))
1481 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1483 # between NAT44 inside interfaces with different VRF (hairpinning)
1484 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1485 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1486 TCP(sport=1234, dport=5678))
1487 self.pg4.add_stream(p)
1488 self.pg_enable_capture(self.pg_interfaces)
1490 capture = self.pg6.get_capture(1)
1495 self.assertEqual(ip.src, self.nat_addr)
1496 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1497 self.assertNotEqual(tcp.sport, 1234)
1498 self.assertEqual(tcp.dport, 5678)
1500 self.logger.error(ppp("Unexpected or invalid packet:", p))
1503 # in2out 1st interface
1504 pkts = self.create_stream_in(self.pg4, self.pg3)
1505 self.pg4.add_stream(pkts)
1506 self.pg_enable_capture(self.pg_interfaces)
1508 capture = self.pg3.get_capture(len(pkts))
1509 self.verify_capture_out(capture)
1511 # out2in 1st interface
1512 pkts = self.create_stream_out(self.pg3)
1513 self.pg3.add_stream(pkts)
1514 self.pg_enable_capture(self.pg_interfaces)
1516 capture = self.pg4.get_capture(len(pkts))
1517 self.verify_capture_in(capture, self.pg4)
1519 # in2out 2nd interface
1520 pkts = self.create_stream_in(self.pg5, self.pg3)
1521 self.pg5.add_stream(pkts)
1522 self.pg_enable_capture(self.pg_interfaces)
1524 capture = self.pg3.get_capture(len(pkts))
1525 self.verify_capture_out(capture)
1527 # out2in 2nd interface
1528 pkts = self.create_stream_out(self.pg3)
1529 self.pg3.add_stream(pkts)
1530 self.pg_enable_capture(self.pg_interfaces)
1532 capture = self.pg5.get_capture(len(pkts))
1533 self.verify_capture_in(capture, self.pg5)
1536 addresses = self.vapi.nat44_address_dump()
1537 self.assertEqual(len(addresses), 1)
1538 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1539 self.assertEqual(len(sessions), 3)
1540 for session in sessions:
1541 self.assertFalse(session.is_static)
1542 self.assertEqual(session.inside_ip_address[0:4],
1543 self.pg5.remote_ip4n)
1544 self.assertEqual(session.outside_ip_address,
1545 addresses[0].ip_address)
1546 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1547 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1548 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1549 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1550 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1551 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1552 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1553 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1554 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1556 # in2out 3rd interface
1557 pkts = self.create_stream_in(self.pg6, self.pg3)
1558 self.pg6.add_stream(pkts)
1559 self.pg_enable_capture(self.pg_interfaces)
1561 capture = self.pg3.get_capture(len(pkts))
1562 self.verify_capture_out(capture, static_nat_ip, True)
1564 # out2in 3rd interface
1565 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1566 self.pg3.add_stream(pkts)
1567 self.pg_enable_capture(self.pg_interfaces)
1569 capture = self.pg6.get_capture(len(pkts))
1570 self.verify_capture_in(capture, self.pg6)
1572 # general user and session dump verifications
1573 users = self.vapi.nat44_user_dump()
1574 self.assertTrue(len(users) >= 3)
1575 addresses = self.vapi.nat44_address_dump()
1576 self.assertEqual(len(addresses), 1)
1578 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1580 for session in sessions:
1581 self.assertEqual(user.ip_address, session.inside_ip_address)
1582 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1583 self.assertTrue(session.protocol in
1584 [IP_PROTOS.tcp, IP_PROTOS.udp,
1588 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1589 self.assertTrue(len(sessions) >= 4)
1590 for session in sessions:
1591 self.assertFalse(session.is_static)
1592 self.assertEqual(session.inside_ip_address[0:4],
1593 self.pg4.remote_ip4n)
1594 self.assertEqual(session.outside_ip_address,
1595 addresses[0].ip_address)
1598 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1599 self.assertTrue(len(sessions) >= 3)
1600 for session in sessions:
1601 self.assertTrue(session.is_static)
1602 self.assertEqual(session.inside_ip_address[0:4],
1603 self.pg6.remote_ip4n)
1604 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1605 map(int, static_nat_ip.split('.')))
1606 self.assertTrue(session.inside_port in
1607 [self.tcp_port_in, self.udp_port_in,
1610 def test_hairpinning(self):
1611 """ NAT44 hairpinning - 1:1 NAPT """
1613 host = self.pg0.remote_hosts[0]
1614 server = self.pg0.remote_hosts[1]
1617 server_in_port = 5678
1618 server_out_port = 8765
1620 self.nat44_add_address(self.nat_addr)
1621 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1622 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1624 # add static mapping for server
1625 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1626 server_in_port, server_out_port,
1627 proto=IP_PROTOS.tcp)
1629 # send packet from host to server
1630 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1631 IP(src=host.ip4, dst=self.nat_addr) /
1632 TCP(sport=host_in_port, dport=server_out_port))
1633 self.pg0.add_stream(p)
1634 self.pg_enable_capture(self.pg_interfaces)
1636 capture = self.pg0.get_capture(1)
1641 self.assertEqual(ip.src, self.nat_addr)
1642 self.assertEqual(ip.dst, server.ip4)
1643 self.assertNotEqual(tcp.sport, host_in_port)
1644 self.assertEqual(tcp.dport, server_in_port)
1645 self.check_tcp_checksum(p)
1646 host_out_port = tcp.sport
1648 self.logger.error(ppp("Unexpected or invalid packet:", p))
1651 # send reply from server to host
1652 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1653 IP(src=server.ip4, dst=self.nat_addr) /
1654 TCP(sport=server_in_port, dport=host_out_port))
1655 self.pg0.add_stream(p)
1656 self.pg_enable_capture(self.pg_interfaces)
1658 capture = self.pg0.get_capture(1)
1663 self.assertEqual(ip.src, self.nat_addr)
1664 self.assertEqual(ip.dst, host.ip4)
1665 self.assertEqual(tcp.sport, server_out_port)
1666 self.assertEqual(tcp.dport, host_in_port)
1667 self.check_tcp_checksum(p)
1669 self.logger.error(ppp("Unexpected or invalid packet:"), p)
1672 def test_hairpinning2(self):
1673 """ NAT44 hairpinning - 1:1 NAT"""
1675 server1_nat_ip = "10.0.0.10"
1676 server2_nat_ip = "10.0.0.11"
1677 host = self.pg0.remote_hosts[0]
1678 server1 = self.pg0.remote_hosts[1]
1679 server2 = self.pg0.remote_hosts[2]
1680 server_tcp_port = 22
1681 server_udp_port = 20
1683 self.nat44_add_address(self.nat_addr)
1684 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1685 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1688 # add static mapping for servers
1689 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1690 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1694 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1695 IP(src=host.ip4, dst=server1_nat_ip) /
1696 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1698 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1699 IP(src=host.ip4, dst=server1_nat_ip) /
1700 UDP(sport=self.udp_port_in, dport=server_udp_port))
1702 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1703 IP(src=host.ip4, dst=server1_nat_ip) /
1704 ICMP(id=self.icmp_id_in, type='echo-request'))
1706 self.pg0.add_stream(pkts)
1707 self.pg_enable_capture(self.pg_interfaces)
1709 capture = self.pg0.get_capture(len(pkts))
1710 for packet in capture:
1712 self.assertEqual(packet[IP].src, self.nat_addr)
1713 self.assertEqual(packet[IP].dst, server1.ip4)
1714 if packet.haslayer(TCP):
1715 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1716 self.assertEqual(packet[TCP].dport, server_tcp_port)
1717 self.tcp_port_out = packet[TCP].sport
1718 self.check_tcp_checksum(packet)
1719 elif packet.haslayer(UDP):
1720 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1721 self.assertEqual(packet[UDP].dport, server_udp_port)
1722 self.udp_port_out = packet[UDP].sport
1724 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1725 self.icmp_id_out = packet[ICMP].id
1727 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1732 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1733 IP(src=server1.ip4, dst=self.nat_addr) /
1734 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1736 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1737 IP(src=server1.ip4, dst=self.nat_addr) /
1738 UDP(sport=server_udp_port, dport=self.udp_port_out))
1740 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1741 IP(src=server1.ip4, dst=self.nat_addr) /
1742 ICMP(id=self.icmp_id_out, type='echo-reply'))
1744 self.pg0.add_stream(pkts)
1745 self.pg_enable_capture(self.pg_interfaces)
1747 capture = self.pg0.get_capture(len(pkts))
1748 for packet in capture:
1750 self.assertEqual(packet[IP].src, server1_nat_ip)
1751 self.assertEqual(packet[IP].dst, host.ip4)
1752 if packet.haslayer(TCP):
1753 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1754 self.assertEqual(packet[TCP].sport, server_tcp_port)
1755 self.check_tcp_checksum(packet)
1756 elif packet.haslayer(UDP):
1757 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1758 self.assertEqual(packet[UDP].sport, server_udp_port)
1760 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1762 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1765 # server2 to server1
1767 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1768 IP(src=server2.ip4, dst=server1_nat_ip) /
1769 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1771 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1772 IP(src=server2.ip4, dst=server1_nat_ip) /
1773 UDP(sport=self.udp_port_in, dport=server_udp_port))
1775 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1776 IP(src=server2.ip4, dst=server1_nat_ip) /
1777 ICMP(id=self.icmp_id_in, type='echo-request'))
1779 self.pg0.add_stream(pkts)
1780 self.pg_enable_capture(self.pg_interfaces)
1782 capture = self.pg0.get_capture(len(pkts))
1783 for packet in capture:
1785 self.assertEqual(packet[IP].src, server2_nat_ip)
1786 self.assertEqual(packet[IP].dst, server1.ip4)
1787 if packet.haslayer(TCP):
1788 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1789 self.assertEqual(packet[TCP].dport, server_tcp_port)
1790 self.tcp_port_out = packet[TCP].sport
1791 self.check_tcp_checksum(packet)
1792 elif packet.haslayer(UDP):
1793 self.assertEqual(packet[UDP].sport, self.udp_port_in)
1794 self.assertEqual(packet[UDP].dport, server_udp_port)
1795 self.udp_port_out = packet[UDP].sport
1797 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1798 self.icmp_id_out = packet[ICMP].id
1800 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1803 # server1 to server2
1805 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1806 IP(src=server1.ip4, dst=server2_nat_ip) /
1807 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1809 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1810 IP(src=server1.ip4, dst=server2_nat_ip) /
1811 UDP(sport=server_udp_port, dport=self.udp_port_out))
1813 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1814 IP(src=server1.ip4, dst=server2_nat_ip) /
1815 ICMP(id=self.icmp_id_out, type='echo-reply'))
1817 self.pg0.add_stream(pkts)
1818 self.pg_enable_capture(self.pg_interfaces)
1820 capture = self.pg0.get_capture(len(pkts))
1821 for packet in capture:
1823 self.assertEqual(packet[IP].src, server1_nat_ip)
1824 self.assertEqual(packet[IP].dst, server2.ip4)
1825 if packet.haslayer(TCP):
1826 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1827 self.assertEqual(packet[TCP].sport, server_tcp_port)
1828 self.check_tcp_checksum(packet)
1829 elif packet.haslayer(UDP):
1830 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1831 self.assertEqual(packet[UDP].sport, server_udp_port)
1833 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1835 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1838 def test_max_translations_per_user(self):
1839 """ MAX translations per user - recycle the least recently used """
1841 self.nat44_add_address(self.nat_addr)
1842 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1843 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1846 # get maximum number of translations per user
1847 nat44_config = self.vapi.nat_show_config()
1849 # send more than maximum number of translations per user packets
1850 pkts_num = nat44_config.max_translations_per_user + 5
1852 for port in range(0, pkts_num):
1853 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1854 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1855 TCP(sport=1025 + port))
1857 self.pg0.add_stream(pkts)
1858 self.pg_enable_capture(self.pg_interfaces)
1861 # verify number of translated packet
1862 self.pg1.get_capture(pkts_num)
1864 def test_interface_addr(self):
1865 """ Acquire NAT44 addresses from interface """
1866 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1868 # no address in NAT pool
1869 adresses = self.vapi.nat44_address_dump()
1870 self.assertEqual(0, len(adresses))
1872 # configure interface address and check NAT address pool
1873 self.pg7.config_ip4()
1874 adresses = self.vapi.nat44_address_dump()
1875 self.assertEqual(1, len(adresses))
1876 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1878 # remove interface address and check NAT address pool
1879 self.pg7.unconfig_ip4()
1880 adresses = self.vapi.nat44_address_dump()
1881 self.assertEqual(0, len(adresses))
1883 def test_interface_addr_static_mapping(self):
1884 """ Static mapping with addresses from interface """
1885 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1886 self.nat44_add_static_mapping(
1888 external_sw_if_index=self.pg7.sw_if_index)
1890 # static mappings with external interface
1891 static_mappings = self.vapi.nat44_static_mapping_dump()
1892 self.assertEqual(1, len(static_mappings))
1893 self.assertEqual(self.pg7.sw_if_index,
1894 static_mappings[0].external_sw_if_index)
1896 # configure interface address and check static mappings
1897 self.pg7.config_ip4()
1898 static_mappings = self.vapi.nat44_static_mapping_dump()
1899 self.assertEqual(1, len(static_mappings))
1900 self.assertEqual(static_mappings[0].external_ip_address[0:4],
1901 self.pg7.local_ip4n)
1902 self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1904 # remove interface address and check static mappings
1905 self.pg7.unconfig_ip4()
1906 static_mappings = self.vapi.nat44_static_mapping_dump()
1907 self.assertEqual(0, len(static_mappings))
1909 def test_interface_addr_identity_nat(self):
1910 """ Identity NAT with addresses from interface """
1913 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1914 self.vapi.nat44_add_del_identity_mapping(
1915 sw_if_index=self.pg7.sw_if_index,
1917 protocol=IP_PROTOS.tcp,
1920 # identity mappings with external interface
1921 identity_mappings = self.vapi.nat44_identity_mapping_dump()
1922 self.assertEqual(1, len(identity_mappings))
1923 self.assertEqual(self.pg7.sw_if_index,
1924 identity_mappings[0].sw_if_index)
1926 # configure interface address and check identity mappings
1927 self.pg7.config_ip4()
1928 identity_mappings = self.vapi.nat44_identity_mapping_dump()
1929 self.assertEqual(1, len(identity_mappings))
1930 self.assertEqual(identity_mappings[0].ip_address,
1931 self.pg7.local_ip4n)
1932 self.assertEqual(0xFFFFFFFF, identity_mappings[0].sw_if_index)
1933 self.assertEqual(port, identity_mappings[0].port)
1934 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
1936 # remove interface address and check identity mappings
1937 self.pg7.unconfig_ip4()
1938 identity_mappings = self.vapi.nat44_identity_mapping_dump()
1939 self.assertEqual(0, len(identity_mappings))
1941 def test_ipfix_nat44_sess(self):
1942 """ IPFIX logging NAT44 session created/delted """
1943 self.ipfix_domain_id = 10
1944 self.ipfix_src_port = 20202
1945 colector_port = 30303
1946 bind_layers(UDP, IPFIX, dport=30303)
1947 self.nat44_add_address(self.nat_addr)
1948 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1949 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1951 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1952 src_address=self.pg3.local_ip4n,
1954 template_interval=10,
1955 collector_port=colector_port)
1956 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1957 src_port=self.ipfix_src_port)
1959 pkts = self.create_stream_in(self.pg0, self.pg1)
1960 self.pg0.add_stream(pkts)
1961 self.pg_enable_capture(self.pg_interfaces)
1963 capture = self.pg1.get_capture(len(pkts))
1964 self.verify_capture_out(capture)
1965 self.nat44_add_address(self.nat_addr, is_add=0)
1966 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1967 capture = self.pg3.get_capture(3)
1968 ipfix = IPFIXDecoder()
1969 # first load template
1971 self.assertTrue(p.haslayer(IPFIX))
1972 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1973 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1974 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1975 self.assertEqual(p[UDP].dport, colector_port)
1976 self.assertEqual(p[IPFIX].observationDomainID,
1977 self.ipfix_domain_id)
1978 if p.haslayer(Template):
1979 ipfix.add_template(p.getlayer(Template))
1980 # verify events in data set
1982 if p.haslayer(Data):
1983 data = ipfix.decode_data_set(p.getlayer(Set))
1984 self.verify_ipfix_nat44_ses(data)
1986 def test_ipfix_addr_exhausted(self):
1987 """ IPFIX logging NAT addresses exhausted """
1988 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1989 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1991 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1992 src_address=self.pg3.local_ip4n,
1994 template_interval=10)
1995 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1996 src_port=self.ipfix_src_port)
1998 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1999 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2001 self.pg0.add_stream(p)
2002 self.pg_enable_capture(self.pg_interfaces)
2004 capture = self.pg1.get_capture(0)
2005 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2006 capture = self.pg3.get_capture(3)
2007 ipfix = IPFIXDecoder()
2008 # first load template
2010 self.assertTrue(p.haslayer(IPFIX))
2011 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2012 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2013 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2014 self.assertEqual(p[UDP].dport, 4739)
2015 self.assertEqual(p[IPFIX].observationDomainID,
2016 self.ipfix_domain_id)
2017 if p.haslayer(Template):
2018 ipfix.add_template(p.getlayer(Template))
2019 # verify events in data set
2021 if p.haslayer(Data):
2022 data = ipfix.decode_data_set(p.getlayer(Set))
2023 self.verify_ipfix_addr_exhausted(data)
2025 def test_pool_addr_fib(self):
2026 """ NAT44 add pool addresses to FIB """
2027 static_addr = '10.0.0.10'
2028 self.nat44_add_address(self.nat_addr)
2029 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2030 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2032 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2035 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2036 ARP(op=ARP.who_has, pdst=self.nat_addr,
2037 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2038 self.pg1.add_stream(p)
2039 self.pg_enable_capture(self.pg_interfaces)
2041 capture = self.pg1.get_capture(1)
2042 self.assertTrue(capture[0].haslayer(ARP))
2043 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2046 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2047 ARP(op=ARP.who_has, pdst=static_addr,
2048 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2049 self.pg1.add_stream(p)
2050 self.pg_enable_capture(self.pg_interfaces)
2052 capture = self.pg1.get_capture(1)
2053 self.assertTrue(capture[0].haslayer(ARP))
2054 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2056 # send ARP to non-NAT44 interface
2057 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2058 ARP(op=ARP.who_has, pdst=self.nat_addr,
2059 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2060 self.pg2.add_stream(p)
2061 self.pg_enable_capture(self.pg_interfaces)
2063 capture = self.pg1.get_capture(0)
2065 # remove addresses and verify
2066 self.nat44_add_address(self.nat_addr, is_add=0)
2067 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2070 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2071 ARP(op=ARP.who_has, pdst=self.nat_addr,
2072 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2073 self.pg1.add_stream(p)
2074 self.pg_enable_capture(self.pg_interfaces)
2076 capture = self.pg1.get_capture(0)
2078 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2079 ARP(op=ARP.who_has, pdst=static_addr,
2080 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2081 self.pg1.add_stream(p)
2082 self.pg_enable_capture(self.pg_interfaces)
2084 capture = self.pg1.get_capture(0)
2086 def test_vrf_mode(self):
2087 """ NAT44 tenant VRF aware address pool mode """
2091 nat_ip1 = "10.0.0.10"
2092 nat_ip2 = "10.0.0.11"
2094 self.pg0.unconfig_ip4()
2095 self.pg1.unconfig_ip4()
2096 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2097 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2098 self.pg0.set_table_ip4(vrf_id1)
2099 self.pg1.set_table_ip4(vrf_id2)
2100 self.pg0.config_ip4()
2101 self.pg1.config_ip4()
2103 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2104 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2105 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2106 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2107 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2111 pkts = self.create_stream_in(self.pg0, self.pg2)
2112 self.pg0.add_stream(pkts)
2113 self.pg_enable_capture(self.pg_interfaces)
2115 capture = self.pg2.get_capture(len(pkts))
2116 self.verify_capture_out(capture, nat_ip1)
2119 pkts = self.create_stream_in(self.pg1, self.pg2)
2120 self.pg1.add_stream(pkts)
2121 self.pg_enable_capture(self.pg_interfaces)
2123 capture = self.pg2.get_capture(len(pkts))
2124 self.verify_capture_out(capture, nat_ip2)
2126 self.pg0.unconfig_ip4()
2127 self.pg1.unconfig_ip4()
2128 self.pg0.set_table_ip4(0)
2129 self.pg1.set_table_ip4(0)
2130 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2131 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2133 def test_vrf_feature_independent(self):
2134 """ NAT44 tenant VRF independent address pool mode """
2136 nat_ip1 = "10.0.0.10"
2137 nat_ip2 = "10.0.0.11"
2139 self.nat44_add_address(nat_ip1)
2140 self.nat44_add_address(nat_ip2, vrf_id=99)
2141 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2142 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2143 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2147 pkts = self.create_stream_in(self.pg0, self.pg2)
2148 self.pg0.add_stream(pkts)
2149 self.pg_enable_capture(self.pg_interfaces)
2151 capture = self.pg2.get_capture(len(pkts))
2152 self.verify_capture_out(capture, nat_ip1)
2155 pkts = self.create_stream_in(self.pg1, self.pg2)
2156 self.pg1.add_stream(pkts)
2157 self.pg_enable_capture(self.pg_interfaces)
2159 capture = self.pg2.get_capture(len(pkts))
2160 self.verify_capture_out(capture, nat_ip1)
2162 def test_dynamic_ipless_interfaces(self):
2163 """ NAT44 interfaces without configured IP address """
2165 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2166 mactobinary(self.pg7.remote_mac),
2167 self.pg7.remote_ip4n,
2169 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2170 mactobinary(self.pg8.remote_mac),
2171 self.pg8.remote_ip4n,
2174 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2175 dst_address_length=32,
2176 next_hop_address=self.pg7.remote_ip4n,
2177 next_hop_sw_if_index=self.pg7.sw_if_index)
2178 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2179 dst_address_length=32,
2180 next_hop_address=self.pg8.remote_ip4n,
2181 next_hop_sw_if_index=self.pg8.sw_if_index)
2183 self.nat44_add_address(self.nat_addr)
2184 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2185 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2189 pkts = self.create_stream_in(self.pg7, self.pg8)
2190 self.pg7.add_stream(pkts)
2191 self.pg_enable_capture(self.pg_interfaces)
2193 capture = self.pg8.get_capture(len(pkts))
2194 self.verify_capture_out(capture)
2197 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2198 self.pg8.add_stream(pkts)
2199 self.pg_enable_capture(self.pg_interfaces)
2201 capture = self.pg7.get_capture(len(pkts))
2202 self.verify_capture_in(capture, self.pg7)
2204 def test_static_ipless_interfaces(self):
2205 """ NAT44 interfaces without configured IP address - 1:1 NAT """
2207 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2208 mactobinary(self.pg7.remote_mac),
2209 self.pg7.remote_ip4n,
2211 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2212 mactobinary(self.pg8.remote_mac),
2213 self.pg8.remote_ip4n,
2216 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2217 dst_address_length=32,
2218 next_hop_address=self.pg7.remote_ip4n,
2219 next_hop_sw_if_index=self.pg7.sw_if_index)
2220 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2221 dst_address_length=32,
2222 next_hop_address=self.pg8.remote_ip4n,
2223 next_hop_sw_if_index=self.pg8.sw_if_index)
2225 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2226 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2227 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2231 pkts = self.create_stream_out(self.pg8)
2232 self.pg8.add_stream(pkts)
2233 self.pg_enable_capture(self.pg_interfaces)
2235 capture = self.pg7.get_capture(len(pkts))
2236 self.verify_capture_in(capture, self.pg7)
2239 pkts = self.create_stream_in(self.pg7, self.pg8)
2240 self.pg7.add_stream(pkts)
2241 self.pg_enable_capture(self.pg_interfaces)
2243 capture = self.pg8.get_capture(len(pkts))
2244 self.verify_capture_out(capture, self.nat_addr, True)
2246 def test_static_with_port_ipless_interfaces(self):
2247 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2249 self.tcp_port_out = 30606
2250 self.udp_port_out = 30607
2251 self.icmp_id_out = 30608
2253 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2254 mactobinary(self.pg7.remote_mac),
2255 self.pg7.remote_ip4n,
2257 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2258 mactobinary(self.pg8.remote_mac),
2259 self.pg8.remote_ip4n,
2262 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2263 dst_address_length=32,
2264 next_hop_address=self.pg7.remote_ip4n,
2265 next_hop_sw_if_index=self.pg7.sw_if_index)
2266 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2267 dst_address_length=32,
2268 next_hop_address=self.pg8.remote_ip4n,
2269 next_hop_sw_if_index=self.pg8.sw_if_index)
2271 self.nat44_add_address(self.nat_addr)
2272 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2273 self.tcp_port_in, self.tcp_port_out,
2274 proto=IP_PROTOS.tcp)
2275 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2276 self.udp_port_in, self.udp_port_out,
2277 proto=IP_PROTOS.udp)
2278 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2279 self.icmp_id_in, self.icmp_id_out,
2280 proto=IP_PROTOS.icmp)
2281 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2282 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2286 pkts = self.create_stream_out(self.pg8)
2287 self.pg8.add_stream(pkts)
2288 self.pg_enable_capture(self.pg_interfaces)
2290 capture = self.pg7.get_capture(len(pkts))
2291 self.verify_capture_in(capture, self.pg7)
2294 pkts = self.create_stream_in(self.pg7, self.pg8)
2295 self.pg7.add_stream(pkts)
2296 self.pg_enable_capture(self.pg_interfaces)
2298 capture = self.pg8.get_capture(len(pkts))
2299 self.verify_capture_out(capture)
2301 def test_static_unknown_proto(self):
2302 """ 1:1 NAT translate packet with unknown protocol """
2303 nat_ip = "10.0.0.10"
2304 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2305 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2306 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2310 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2311 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2313 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2314 TCP(sport=1234, dport=1234))
2315 self.pg0.add_stream(p)
2316 self.pg_enable_capture(self.pg_interfaces)
2318 p = self.pg1.get_capture(1)
2321 self.assertEqual(packet[IP].src, nat_ip)
2322 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2323 self.assertTrue(packet.haslayer(GRE))
2324 self.check_ip_checksum(packet)
2326 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2330 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2331 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2333 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2334 TCP(sport=1234, dport=1234))
2335 self.pg1.add_stream(p)
2336 self.pg_enable_capture(self.pg_interfaces)
2338 p = self.pg0.get_capture(1)
2341 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2342 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2343 self.assertTrue(packet.haslayer(GRE))
2344 self.check_ip_checksum(packet)
2346 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2349 def test_hairpinning_static_unknown_proto(self):
2350 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2352 host = self.pg0.remote_hosts[0]
2353 server = self.pg0.remote_hosts[1]
2355 host_nat_ip = "10.0.0.10"
2356 server_nat_ip = "10.0.0.11"
2358 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2359 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2360 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2361 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2365 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2366 IP(src=host.ip4, dst=server_nat_ip) /
2368 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2369 TCP(sport=1234, dport=1234))
2370 self.pg0.add_stream(p)
2371 self.pg_enable_capture(self.pg_interfaces)
2373 p = self.pg0.get_capture(1)
2376 self.assertEqual(packet[IP].src, host_nat_ip)
2377 self.assertEqual(packet[IP].dst, server.ip4)
2378 self.assertTrue(packet.haslayer(GRE))
2379 self.check_ip_checksum(packet)
2381 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2385 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2386 IP(src=server.ip4, dst=host_nat_ip) /
2388 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2389 TCP(sport=1234, dport=1234))
2390 self.pg0.add_stream(p)
2391 self.pg_enable_capture(self.pg_interfaces)
2393 p = self.pg0.get_capture(1)
2396 self.assertEqual(packet[IP].src, server_nat_ip)
2397 self.assertEqual(packet[IP].dst, host.ip4)
2398 self.assertTrue(packet.haslayer(GRE))
2399 self.check_ip_checksum(packet)
2401 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2404 def test_unknown_proto(self):
2405 """ NAT44 translate packet with unknown protocol """
2406 self.nat44_add_address(self.nat_addr)
2407 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2408 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2412 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2413 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2414 TCP(sport=self.tcp_port_in, dport=20))
2415 self.pg0.add_stream(p)
2416 self.pg_enable_capture(self.pg_interfaces)
2418 p = self.pg1.get_capture(1)
2420 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2421 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2423 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2424 TCP(sport=1234, dport=1234))
2425 self.pg0.add_stream(p)
2426 self.pg_enable_capture(self.pg_interfaces)
2428 p = self.pg1.get_capture(1)
2431 self.assertEqual(packet[IP].src, self.nat_addr)
2432 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2433 self.assertTrue(packet.haslayer(GRE))
2434 self.check_ip_checksum(packet)
2436 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2440 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2441 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2443 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2444 TCP(sport=1234, dport=1234))
2445 self.pg1.add_stream(p)
2446 self.pg_enable_capture(self.pg_interfaces)
2448 p = self.pg0.get_capture(1)
2451 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2452 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2453 self.assertTrue(packet.haslayer(GRE))
2454 self.check_ip_checksum(packet)
2456 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2459 def test_hairpinning_unknown_proto(self):
2460 """ NAT44 translate packet with unknown protocol - hairpinning """
2461 host = self.pg0.remote_hosts[0]
2462 server = self.pg0.remote_hosts[1]
2465 server_in_port = 5678
2466 server_out_port = 8765
2467 server_nat_ip = "10.0.0.11"
2469 self.nat44_add_address(self.nat_addr)
2470 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2471 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2474 # add static mapping for server
2475 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2478 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2479 IP(src=host.ip4, dst=server_nat_ip) /
2480 TCP(sport=host_in_port, dport=server_out_port))
2481 self.pg0.add_stream(p)
2482 self.pg_enable_capture(self.pg_interfaces)
2484 capture = self.pg0.get_capture(1)
2486 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2487 IP(src=host.ip4, dst=server_nat_ip) /
2489 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2490 TCP(sport=1234, dport=1234))
2491 self.pg0.add_stream(p)
2492 self.pg_enable_capture(self.pg_interfaces)
2494 p = self.pg0.get_capture(1)
2497 self.assertEqual(packet[IP].src, self.nat_addr)
2498 self.assertEqual(packet[IP].dst, server.ip4)
2499 self.assertTrue(packet.haslayer(GRE))
2500 self.check_ip_checksum(packet)
2502 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2506 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2507 IP(src=server.ip4, dst=self.nat_addr) /
2509 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2510 TCP(sport=1234, dport=1234))
2511 self.pg0.add_stream(p)
2512 self.pg_enable_capture(self.pg_interfaces)
2514 p = self.pg0.get_capture(1)
2517 self.assertEqual(packet[IP].src, server_nat_ip)
2518 self.assertEqual(packet[IP].dst, host.ip4)
2519 self.assertTrue(packet.haslayer(GRE))
2520 self.check_ip_checksum(packet)
2522 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2525 def test_output_feature(self):
2526 """ NAT44 interface output feature (in2out postrouting) """
2527 self.nat44_add_address(self.nat_addr)
2528 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2529 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
2530 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2534 pkts = self.create_stream_in(self.pg0, self.pg3)
2535 self.pg0.add_stream(pkts)
2536 self.pg_enable_capture(self.pg_interfaces)
2538 capture = self.pg3.get_capture(len(pkts))
2539 self.verify_capture_out(capture)
2542 pkts = self.create_stream_out(self.pg3)
2543 self.pg3.add_stream(pkts)
2544 self.pg_enable_capture(self.pg_interfaces)
2546 capture = self.pg0.get_capture(len(pkts))
2547 self.verify_capture_in(capture, self.pg0)
2549 # from non-NAT interface to NAT inside interface
2550 pkts = self.create_stream_in(self.pg2, self.pg0)
2551 self.pg2.add_stream(pkts)
2552 self.pg_enable_capture(self.pg_interfaces)
2554 capture = self.pg0.get_capture(len(pkts))
2555 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2557 def test_output_feature_vrf_aware(self):
2558 """ NAT44 interface output feature VRF aware (in2out postrouting) """
2559 nat_ip_vrf10 = "10.0.0.10"
2560 nat_ip_vrf20 = "10.0.0.20"
2562 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2563 dst_address_length=32,
2564 next_hop_address=self.pg3.remote_ip4n,
2565 next_hop_sw_if_index=self.pg3.sw_if_index,
2567 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2568 dst_address_length=32,
2569 next_hop_address=self.pg3.remote_ip4n,
2570 next_hop_sw_if_index=self.pg3.sw_if_index,
2573 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2574 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2575 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2576 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2577 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2581 pkts = self.create_stream_in(self.pg4, self.pg3)
2582 self.pg4.add_stream(pkts)
2583 self.pg_enable_capture(self.pg_interfaces)
2585 capture = self.pg3.get_capture(len(pkts))
2586 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2589 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2590 self.pg3.add_stream(pkts)
2591 self.pg_enable_capture(self.pg_interfaces)
2593 capture = self.pg4.get_capture(len(pkts))
2594 self.verify_capture_in(capture, self.pg4)
2597 pkts = self.create_stream_in(self.pg6, self.pg3)
2598 self.pg6.add_stream(pkts)
2599 self.pg_enable_capture(self.pg_interfaces)
2601 capture = self.pg3.get_capture(len(pkts))
2602 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2605 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2606 self.pg3.add_stream(pkts)
2607 self.pg_enable_capture(self.pg_interfaces)
2609 capture = self.pg6.get_capture(len(pkts))
2610 self.verify_capture_in(capture, self.pg6)
2612 def test_output_feature_hairpinning(self):
2613 """ NAT44 interface output feature hairpinning (in2out postrouting) """
2614 host = self.pg0.remote_hosts[0]
2615 server = self.pg0.remote_hosts[1]
2618 server_in_port = 5678
2619 server_out_port = 8765
2621 self.nat44_add_address(self.nat_addr)
2622 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2623 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2626 # add static mapping for server
2627 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2628 server_in_port, server_out_port,
2629 proto=IP_PROTOS.tcp)
2631 # send packet from host to server
2632 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2633 IP(src=host.ip4, dst=self.nat_addr) /
2634 TCP(sport=host_in_port, dport=server_out_port))
2635 self.pg0.add_stream(p)
2636 self.pg_enable_capture(self.pg_interfaces)
2638 capture = self.pg0.get_capture(1)
2643 self.assertEqual(ip.src, self.nat_addr)
2644 self.assertEqual(ip.dst, server.ip4)
2645 self.assertNotEqual(tcp.sport, host_in_port)
2646 self.assertEqual(tcp.dport, server_in_port)
2647 self.check_tcp_checksum(p)
2648 host_out_port = tcp.sport
2650 self.logger.error(ppp("Unexpected or invalid packet:", p))
2653 # send reply from server to host
2654 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2655 IP(src=server.ip4, dst=self.nat_addr) /
2656 TCP(sport=server_in_port, dport=host_out_port))
2657 self.pg0.add_stream(p)
2658 self.pg_enable_capture(self.pg_interfaces)
2660 capture = self.pg0.get_capture(1)
2665 self.assertEqual(ip.src, self.nat_addr)
2666 self.assertEqual(ip.dst, host.ip4)
2667 self.assertEqual(tcp.sport, server_out_port)
2668 self.assertEqual(tcp.dport, host_in_port)
2669 self.check_tcp_checksum(p)
2671 self.logger.error(ppp("Unexpected or invalid packet:"), p)
2674 def test_one_armed_nat44(self):
2675 """ One armed NAT44 """
2676 remote_host = self.pg9.remote_hosts[0]
2677 local_host = self.pg9.remote_hosts[1]
2680 self.nat44_add_address(self.nat_addr)
2681 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
2682 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
2686 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2687 IP(src=local_host.ip4, dst=remote_host.ip4) /
2688 TCP(sport=12345, dport=80))
2689 self.pg9.add_stream(p)
2690 self.pg_enable_capture(self.pg_interfaces)
2692 capture = self.pg9.get_capture(1)
2697 self.assertEqual(ip.src, self.nat_addr)
2698 self.assertEqual(ip.dst, remote_host.ip4)
2699 self.assertNotEqual(tcp.sport, 12345)
2700 external_port = tcp.sport
2701 self.assertEqual(tcp.dport, 80)
2702 self.check_tcp_checksum(p)
2703 self.check_ip_checksum(p)
2705 self.logger.error(ppp("Unexpected or invalid packet:", p))
2709 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2710 IP(src=remote_host.ip4, dst=self.nat_addr) /
2711 TCP(sport=80, dport=external_port))
2712 self.pg9.add_stream(p)
2713 self.pg_enable_capture(self.pg_interfaces)
2715 capture = self.pg9.get_capture(1)
2720 self.assertEqual(ip.src, remote_host.ip4)
2721 self.assertEqual(ip.dst, local_host.ip4)
2722 self.assertEqual(tcp.sport, 80)
2723 self.assertEqual(tcp.dport, 12345)
2724 self.check_tcp_checksum(p)
2725 self.check_ip_checksum(p)
2727 self.logger.error(ppp("Unexpected or invalid packet:", p))
2730 def test_del_session(self):
2731 """ Delete NAT44 session """
2732 self.nat44_add_address(self.nat_addr)
2733 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2734 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2737 pkts = self.create_stream_in(self.pg0, self.pg1)
2738 self.pg0.add_stream(pkts)
2739 self.pg_enable_capture(self.pg_interfaces)
2741 capture = self.pg1.get_capture(len(pkts))
2743 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2744 nsessions = len(sessions)
2746 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
2747 sessions[0].inside_port,
2748 sessions[0].protocol)
2749 self.vapi.nat44_del_session(sessions[1].outside_ip_address,
2750 sessions[1].outside_port,
2751 sessions[1].protocol,
2754 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2755 self.assertEqual(nsessions - len(sessions), 2)
2757 def test_set_get_reass(self):
2758 """ NAT44 set/get virtual fragmentation reassembly """
2759 reas_cfg1 = self.vapi.nat_get_reass()
2761 self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
2762 max_reass=reas_cfg1.ip4_max_reass * 2,
2763 max_frag=reas_cfg1.ip4_max_frag * 2)
2765 reas_cfg2 = self.vapi.nat_get_reass()
2767 self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
2768 self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
2769 self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
2771 self.vapi.nat_set_reass(drop_frag=1)
2772 self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
2774 def test_frag_in_order(self):
2775 """ NAT44 translate fragments arriving in order """
2776 self.nat44_add_address(self.nat_addr)
2777 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2778 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2781 data = "A" * 4 + "B" * 16 + "C" * 3
2782 self.tcp_port_in = random.randint(1025, 65535)
2784 reass = self.vapi.nat_reass_dump()
2785 reass_n_start = len(reass)
2788 pkts = self.create_stream_frag(self.pg0,
2789 self.pg1.remote_ip4,
2793 self.pg0.add_stream(pkts)
2794 self.pg_enable_capture(self.pg_interfaces)
2796 frags = self.pg1.get_capture(len(pkts))
2797 p = self.reass_frags_and_verify(frags,
2799 self.pg1.remote_ip4)
2800 self.assertEqual(p[TCP].dport, 20)
2801 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
2802 self.tcp_port_out = p[TCP].sport
2803 self.assertEqual(data, p[Raw].load)
2806 pkts = self.create_stream_frag(self.pg1,
2811 self.pg1.add_stream(pkts)
2812 self.pg_enable_capture(self.pg_interfaces)
2814 frags = self.pg0.get_capture(len(pkts))
2815 p = self.reass_frags_and_verify(frags,
2816 self.pg1.remote_ip4,
2817 self.pg0.remote_ip4)
2818 self.assertEqual(p[TCP].sport, 20)
2819 self.assertEqual(p[TCP].dport, self.tcp_port_in)
2820 self.assertEqual(data, p[Raw].load)
2822 reass = self.vapi.nat_reass_dump()
2823 reass_n_end = len(reass)
2825 self.assertEqual(reass_n_end - reass_n_start, 2)
2827 def test_reass_hairpinning(self):
2828 """ NAT44 fragments hairpinning """
2829 host = self.pg0.remote_hosts[0]
2830 server = self.pg0.remote_hosts[1]
2831 host_in_port = random.randint(1025, 65535)
2833 server_in_port = random.randint(1025, 65535)
2834 server_out_port = random.randint(1025, 65535)
2835 data = "A" * 4 + "B" * 16 + "C" * 3
2837 self.nat44_add_address(self.nat_addr)
2838 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2839 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2841 # add static mapping for server
2842 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2843 server_in_port, server_out_port,
2844 proto=IP_PROTOS.tcp)
2846 # send packet from host to server
2847 pkts = self.create_stream_frag(self.pg0,
2852 self.pg0.add_stream(pkts)
2853 self.pg_enable_capture(self.pg_interfaces)
2855 frags = self.pg0.get_capture(len(pkts))
2856 p = self.reass_frags_and_verify(frags,
2859 self.assertNotEqual(p[TCP].sport, host_in_port)
2860 self.assertEqual(p[TCP].dport, server_in_port)
2861 self.assertEqual(data, p[Raw].load)
2863 def test_frag_out_of_order(self):
2864 """ NAT44 translate fragments arriving out of order """
2865 self.nat44_add_address(self.nat_addr)
2866 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2867 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2870 data = "A" * 4 + "B" * 16 + "C" * 3
2871 random.randint(1025, 65535)
2874 pkts = self.create_stream_frag(self.pg0,
2875 self.pg1.remote_ip4,
2880 self.pg0.add_stream(pkts)
2881 self.pg_enable_capture(self.pg_interfaces)
2883 frags = self.pg1.get_capture(len(pkts))
2884 p = self.reass_frags_and_verify(frags,
2886 self.pg1.remote_ip4)
2887 self.assertEqual(p[TCP].dport, 20)
2888 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
2889 self.tcp_port_out = p[TCP].sport
2890 self.assertEqual(data, p[Raw].load)
2893 pkts = self.create_stream_frag(self.pg1,
2899 self.pg1.add_stream(pkts)
2900 self.pg_enable_capture(self.pg_interfaces)
2902 frags = self.pg0.get_capture(len(pkts))
2903 p = self.reass_frags_and_verify(frags,
2904 self.pg1.remote_ip4,
2905 self.pg0.remote_ip4)
2906 self.assertEqual(p[TCP].sport, 20)
2907 self.assertEqual(p[TCP].dport, self.tcp_port_in)
2908 self.assertEqual(data, p[Raw].load)
2910 def test_port_restricted(self):
2911 """ Port restricted NAT44 (MAP-E CE) """
2912 self.nat44_add_address(self.nat_addr)
2913 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2914 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2916 self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
2917 "psid-offset 6 psid-len 6")
2919 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2920 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2921 TCP(sport=4567, dport=22))
2922 self.pg0.add_stream(p)
2923 self.pg_enable_capture(self.pg_interfaces)
2925 capture = self.pg1.get_capture(1)
2930 self.assertEqual(ip.dst, self.pg1.remote_ip4)
2931 self.assertEqual(ip.src, self.nat_addr)
2932 self.assertEqual(tcp.dport, 22)
2933 self.assertNotEqual(tcp.sport, 4567)
2934 self.assertEqual((tcp.sport >> 6) & 63, 10)
2935 self.check_tcp_checksum(p)
2936 self.check_ip_checksum(p)
2938 self.logger.error(ppp("Unexpected or invalid packet:", p))
2941 def test_twice_nat(self):
2943 twice_nat_addr = '10.0.1.3'
2948 self.nat44_add_address(self.nat_addr)
2949 self.nat44_add_address(twice_nat_addr, twice_nat=1)
2950 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
2951 port_in, port_out, proto=IP_PROTOS.tcp,
2953 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2954 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2957 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2958 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2959 TCP(sport=eh_port_out, dport=port_out))
2960 self.pg1.add_stream(p)
2961 self.pg_enable_capture(self.pg_interfaces)
2963 capture = self.pg0.get_capture(1)
2968 self.assertEqual(ip.dst, self.pg0.remote_ip4)
2969 self.assertEqual(ip.src, twice_nat_addr)
2970 self.assertEqual(tcp.dport, port_in)
2971 self.assertNotEqual(tcp.sport, eh_port_out)
2972 eh_port_in = tcp.sport
2973 self.check_tcp_checksum(p)
2974 self.check_ip_checksum(p)
2976 self.logger.error(ppp("Unexpected or invalid packet:", p))
2979 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2980 IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) /
2981 TCP(sport=port_in, dport=eh_port_in))
2982 self.pg0.add_stream(p)
2983 self.pg_enable_capture(self.pg_interfaces)
2985 capture = self.pg1.get_capture(1)
2990 self.assertEqual(ip.dst, self.pg1.remote_ip4)
2991 self.assertEqual(ip.src, self.nat_addr)
2992 self.assertEqual(tcp.dport, eh_port_out)
2993 self.assertEqual(tcp.sport, port_out)
2994 self.check_tcp_checksum(p)
2995 self.check_ip_checksum(p)
2997 self.logger.error(ppp("Unexpected or invalid packet:", p))
3000 def test_twice_nat_lb(self):
3001 """ Twice NAT44 local service load balancing """
3002 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3003 twice_nat_addr = '10.0.1.3'
3008 server1 = self.pg0.remote_hosts[0]
3009 server2 = self.pg0.remote_hosts[1]
3011 locals = [{'addr': server1.ip4n,
3014 {'addr': server2.ip4n,
3018 self.nat44_add_address(self.nat_addr)
3019 self.nat44_add_address(twice_nat_addr, twice_nat=1)
3021 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3025 local_num=len(locals),
3027 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3028 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3031 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3032 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3033 TCP(sport=eh_port_out, dport=external_port))
3034 self.pg1.add_stream(p)
3035 self.pg_enable_capture(self.pg_interfaces)
3037 capture = self.pg0.get_capture(1)
3043 self.assertEqual(ip.src, twice_nat_addr)
3044 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3045 if ip.dst == server1.ip4:
3049 self.assertNotEqual(tcp.sport, eh_port_out)
3050 eh_port_in = tcp.sport
3051 self.assertEqual(tcp.dport, local_port)
3052 self.check_tcp_checksum(p)
3053 self.check_ip_checksum(p)
3055 self.logger.error(ppp("Unexpected or invalid packet:", p))
3058 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3059 IP(src=server.ip4, dst=twice_nat_addr) /
3060 TCP(sport=local_port, dport=eh_port_in))
3061 self.pg0.add_stream(p)
3062 self.pg_enable_capture(self.pg_interfaces)
3064 capture = self.pg1.get_capture(1)
3069 self.assertEqual(ip.src, self.nat_addr)
3070 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3071 self.assertEqual(tcp.sport, external_port)
3072 self.assertEqual(tcp.dport, eh_port_out)
3073 self.check_tcp_checksum(p)
3074 self.check_ip_checksum(p)
3076 self.logger.error(ppp("Unexpected or invalid packet:", p))
3079 def test_twice_nat_interface_addr(self):
3080 """ Acquire twice NAT44 addresses from interface """
3081 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index, twice_nat=1)
3083 # no address in NAT pool
3084 adresses = self.vapi.nat44_address_dump()
3085 self.assertEqual(0, len(adresses))
3087 # configure interface address and check NAT address pool
3088 self.pg7.config_ip4()
3089 adresses = self.vapi.nat44_address_dump()
3090 self.assertEqual(1, len(adresses))
3091 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
3092 self.assertEqual(adresses[0].twice_nat, 1)
3094 # remove interface address and check NAT address pool
3095 self.pg7.unconfig_ip4()
3096 adresses = self.vapi.nat44_address_dump()
3097 self.assertEqual(0, len(adresses))
3100 super(TestNAT44, self).tearDown()
3101 if not self.vpp_dead:
3102 self.logger.info(self.vapi.cli("show nat44 verbose"))
3103 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
3104 self.vapi.cli("nat addr-port-assignment-alg default")
3108 class TestDeterministicNAT(MethodHolder):
3109 """ Deterministic NAT Test Cases """
3112 def setUpConstants(cls):
3113 super(TestDeterministicNAT, cls).setUpConstants()
3114 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
3117 def setUpClass(cls):
3118 super(TestDeterministicNAT, cls).setUpClass()
3121 cls.tcp_port_in = 6303
3122 cls.tcp_external_port = 6303
3123 cls.udp_port_in = 6304
3124 cls.udp_external_port = 6304
3125 cls.icmp_id_in = 6305
3126 cls.nat_addr = '10.0.0.3'
3128 cls.create_pg_interfaces(range(3))
3129 cls.interfaces = list(cls.pg_interfaces)
3131 for i in cls.interfaces:
3136 cls.pg0.generate_remote_hosts(2)
3137 cls.pg0.configure_ipv4_neighbors()
3140 super(TestDeterministicNAT, cls).tearDownClass()
3143 def create_stream_in(self, in_if, out_if, ttl=64):
3145 Create packet stream for inside network
3147 :param in_if: Inside interface
3148 :param out_if: Outside interface
3149 :param ttl: TTL of generated packets
3153 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3154 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3155 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
3159 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3160 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3161 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
3165 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3166 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3167 ICMP(id=self.icmp_id_in, type='echo-request'))
3172 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
3174 Create packet stream for outside network
3176 :param out_if: Outside interface
3177 :param dst_ip: Destination IP address (Default use global NAT address)
3178 :param ttl: TTL of generated packets
3181 dst_ip = self.nat_addr
3184 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3185 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3186 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
3190 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3191 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3192 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
3196 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3197 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3198 ICMP(id=self.icmp_external_id, type='echo-reply'))
3203 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
3205 Verify captured packets on outside network
3207 :param capture: Captured packets
3208 :param nat_ip: Translated IP address (Default use global NAT address)
3209 :param same_port: Sorce port number is not translated (Default False)
3210 :param packet_num: Expected number of packets (Default 3)
3213 nat_ip = self.nat_addr
3214 self.assertEqual(packet_num, len(capture))
3215 for packet in capture:
3217 self.assertEqual(packet[IP].src, nat_ip)
3218 if packet.haslayer(TCP):
3219 self.tcp_port_out = packet[TCP].sport
3220 elif packet.haslayer(UDP):
3221 self.udp_port_out = packet[UDP].sport
3223 self.icmp_external_id = packet[ICMP].id
3225 self.logger.error(ppp("Unexpected or invalid packet "
3226 "(outside network):", packet))
3229 def initiate_tcp_session(self, in_if, out_if):
3231 Initiates TCP session
3233 :param in_if: Inside interface
3234 :param out_if: Outside interface
3237 # SYN packet in->out
3238 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
3239 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
3240 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3243 self.pg_enable_capture(self.pg_interfaces)
3245 capture = out_if.get_capture(1)
3247 self.tcp_port_out = p[TCP].sport
3249 # SYN + ACK packet out->in
3250 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
3251 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
3252 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3254 out_if.add_stream(p)
3255 self.pg_enable_capture(self.pg_interfaces)
3257 in_if.get_capture(1)
3259 # ACK packet in->out
3260 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
3261 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
3262 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3265 self.pg_enable_capture(self.pg_interfaces)
3267 out_if.get_capture(1)
3270 self.logger.error("TCP 3 way handshake failed")
3273 def verify_ipfix_max_entries_per_user(self, data):
3275 Verify IPFIX maximum entries per user exceeded event
3277 :param data: Decoded IPFIX data records
3279 self.assertEqual(1, len(data))
3282 self.assertEqual(ord(record[230]), 13)
3283 # natQuotaExceededEvent
3284 self.assertEqual('\x03\x00\x00\x00', record[466])
3286 self.assertEqual(self.pg0.remote_ip4n, record[8])
3288 def test_deterministic_mode(self):
3289 """ NAT plugin run deterministic mode """
3290 in_addr = '172.16.255.0'
3291 out_addr = '172.17.255.50'
3292 in_addr_t = '172.16.255.20'
3293 in_addr_n = socket.inet_aton(in_addr)
3294 out_addr_n = socket.inet_aton(out_addr)
3295 in_addr_t_n = socket.inet_aton(in_addr_t)
3299 nat_config = self.vapi.nat_show_config()
3300 self.assertEqual(1, nat_config.deterministic)
3302 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
3304 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
3305 self.assertEqual(rep1.out_addr[:4], out_addr_n)
3306 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
3307 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
3309 deterministic_mappings = self.vapi.nat_det_map_dump()
3310 self.assertEqual(len(deterministic_mappings), 1)
3311 dsm = deterministic_mappings[0]
3312 self.assertEqual(in_addr_n, dsm.in_addr[:4])
3313 self.assertEqual(in_plen, dsm.in_plen)
3314 self.assertEqual(out_addr_n, dsm.out_addr[:4])
3315 self.assertEqual(out_plen, dsm.out_plen)
3317 self.clear_nat_det()
3318 deterministic_mappings = self.vapi.nat_det_map_dump()
3319 self.assertEqual(len(deterministic_mappings), 0)
3321 def test_set_timeouts(self):
3322 """ Set deterministic NAT timeouts """
3323 timeouts_before = self.vapi.nat_det_get_timeouts()
3325 self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
3326 timeouts_before.tcp_established + 10,
3327 timeouts_before.tcp_transitory + 10,
3328 timeouts_before.icmp + 10)
3330 timeouts_after = self.vapi.nat_det_get_timeouts()
3332 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
3333 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
3334 self.assertNotEqual(timeouts_before.tcp_established,
3335 timeouts_after.tcp_established)
3336 self.assertNotEqual(timeouts_before.tcp_transitory,
3337 timeouts_after.tcp_transitory)
3339 def test_det_in(self):
3340 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
3342 nat_ip = "10.0.0.10"
3344 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3346 socket.inet_aton(nat_ip),
3348 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3349 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3353 pkts = self.create_stream_in(self.pg0, self.pg1)
3354 self.pg0.add_stream(pkts)
3355 self.pg_enable_capture(self.pg_interfaces)
3357 capture = self.pg1.get_capture(len(pkts))
3358 self.verify_capture_out(capture, nat_ip)
3361 pkts = self.create_stream_out(self.pg1, nat_ip)
3362 self.pg1.add_stream(pkts)
3363 self.pg_enable_capture(self.pg_interfaces)
3365 capture = self.pg0.get_capture(len(pkts))
3366 self.verify_capture_in(capture, self.pg0)
3369 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
3370 self.assertEqual(len(sessions), 3)
3374 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3375 self.assertEqual(s.in_port, self.tcp_port_in)
3376 self.assertEqual(s.out_port, self.tcp_port_out)
3377 self.assertEqual(s.ext_port, self.tcp_external_port)
3381 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3382 self.assertEqual(s.in_port, self.udp_port_in)
3383 self.assertEqual(s.out_port, self.udp_port_out)
3384 self.assertEqual(s.ext_port, self.udp_external_port)
3388 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3389 self.assertEqual(s.in_port, self.icmp_id_in)
3390 self.assertEqual(s.out_port, self.icmp_external_id)
3392 def test_multiple_users(self):
3393 """ Deterministic NAT multiple users """
3395 nat_ip = "10.0.0.10"
3397 external_port = 6303
3399 host0 = self.pg0.remote_hosts[0]
3400 host1 = self.pg0.remote_hosts[1]
3402 self.vapi.nat_det_add_del_map(host0.ip4n,
3404 socket.inet_aton(nat_ip),
3406 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3407 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3411 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
3412 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
3413 TCP(sport=port_in, dport=external_port))
3414 self.pg0.add_stream(p)
3415 self.pg_enable_capture(self.pg_interfaces)
3417 capture = self.pg1.get_capture(1)
3422 self.assertEqual(ip.src, nat_ip)
3423 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3424 self.assertEqual(tcp.dport, external_port)
3425 port_out0 = tcp.sport
3427 self.logger.error(ppp("Unexpected or invalid packet:", p))
3431 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
3432 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
3433 TCP(sport=port_in, dport=external_port))
3434 self.pg0.add_stream(p)
3435 self.pg_enable_capture(self.pg_interfaces)
3437 capture = self.pg1.get_capture(1)
3442 self.assertEqual(ip.src, nat_ip)
3443 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3444 self.assertEqual(tcp.dport, external_port)
3445 port_out1 = tcp.sport
3447 self.logger.error(ppp("Unexpected or invalid packet:", p))
3450 dms = self.vapi.nat_det_map_dump()
3451 self.assertEqual(1, len(dms))
3452 self.assertEqual(2, dms[0].ses_num)
3455 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3456 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
3457 TCP(sport=external_port, dport=port_out0))
3458 self.pg1.add_stream(p)
3459 self.pg_enable_capture(self.pg_interfaces)
3461 capture = self.pg0.get_capture(1)
3466 self.assertEqual(ip.src, self.pg1.remote_ip4)
3467 self.assertEqual(ip.dst, host0.ip4)
3468 self.assertEqual(tcp.dport, port_in)
3469 self.assertEqual(tcp.sport, external_port)
3471 self.logger.error(ppp("Unexpected or invalid packet:", p))
3475 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3476 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
3477 TCP(sport=external_port, dport=port_out1))
3478 self.pg1.add_stream(p)
3479 self.pg_enable_capture(self.pg_interfaces)
3481 capture = self.pg0.get_capture(1)
3486 self.assertEqual(ip.src, self.pg1.remote_ip4)
3487 self.assertEqual(ip.dst, host1.ip4)
3488 self.assertEqual(tcp.dport, port_in)
3489 self.assertEqual(tcp.sport, external_port)
3491 self.logger.error(ppp("Unexpected or invalid packet", p))
3494 # session close api test
3495 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
3497 self.pg1.remote_ip4n,
3499 dms = self.vapi.nat_det_map_dump()
3500 self.assertEqual(dms[0].ses_num, 1)
3502 self.vapi.nat_det_close_session_in(host0.ip4n,
3504 self.pg1.remote_ip4n,
3506 dms = self.vapi.nat_det_map_dump()
3507 self.assertEqual(dms[0].ses_num, 0)
3509 def test_tcp_session_close_detection_in(self):
3510 """ Deterministic NAT TCP session close from inside network """
3511 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3513 socket.inet_aton(self.nat_addr),
3515 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3516 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3519 self.initiate_tcp_session(self.pg0, self.pg1)
3521 # close the session from inside
3523 # FIN packet in -> out
3524 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3525 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3526 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3528 self.pg0.add_stream(p)
3529 self.pg_enable_capture(self.pg_interfaces)
3531 self.pg1.get_capture(1)
3535 # ACK packet out -> in
3536 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3537 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3538 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3542 # FIN packet out -> in
3543 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3544 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3545 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3549 self.pg1.add_stream(pkts)
3550 self.pg_enable_capture(self.pg_interfaces)
3552 self.pg0.get_capture(2)
3554 # ACK packet in -> out
3555 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3556 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3557 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3559 self.pg0.add_stream(p)
3560 self.pg_enable_capture(self.pg_interfaces)
3562 self.pg1.get_capture(1)
3564 # Check if deterministic NAT44 closed the session
3565 dms = self.vapi.nat_det_map_dump()
3566 self.assertEqual(0, dms[0].ses_num)
3568 self.logger.error("TCP session termination failed")
3571 def test_tcp_session_close_detection_out(self):
3572 """ Deterministic NAT TCP session close from outside network """
3573 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3575 socket.inet_aton(self.nat_addr),
3577 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3578 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3581 self.initiate_tcp_session(self.pg0, self.pg1)
3583 # close the session from outside
3585 # FIN packet out -> in
3586 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3587 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3588 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3590 self.pg1.add_stream(p)
3591 self.pg_enable_capture(self.pg_interfaces)
3593 self.pg0.get_capture(1)
3597 # ACK packet in -> out
3598 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3599 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3600 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3604 # ACK packet in -> out
3605 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3606 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3607 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3611 self.pg0.add_stream(pkts)
3612 self.pg_enable_capture(self.pg_interfaces)
3614 self.pg1.get_capture(2)
3616 # ACK packet out -> in
3617 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3618 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3619 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3621 self.pg1.add_stream(p)
3622 self.pg_enable_capture(self.pg_interfaces)
3624 self.pg0.get_capture(1)
3626 # Check if deterministic NAT44 closed the session
3627 dms = self.vapi.nat_det_map_dump()
3628 self.assertEqual(0, dms[0].ses_num)
3630 self.logger.error("TCP session termination failed")
3633 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3634 def test_session_timeout(self):
3635 """ Deterministic NAT session timeouts """
3636 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3638 socket.inet_aton(self.nat_addr),
3640 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3641 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3644 self.initiate_tcp_session(self.pg0, self.pg1)
3645 self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
3646 pkts = self.create_stream_in(self.pg0, self.pg1)
3647 self.pg0.add_stream(pkts)
3648 self.pg_enable_capture(self.pg_interfaces)
3650 capture = self.pg1.get_capture(len(pkts))
3653 dms = self.vapi.nat_det_map_dump()
3654 self.assertEqual(0, dms[0].ses_num)
3656 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3657 def test_session_limit_per_user(self):
3658 """ Deterministic NAT maximum sessions per user limit """
3659 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3661 socket.inet_aton(self.nat_addr),
3663 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3664 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3666 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
3667 src_address=self.pg2.local_ip4n,
3669 template_interval=10)
3670 self.vapi.nat_ipfix()
3673 for port in range(1025, 2025):
3674 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3675 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3676 UDP(sport=port, dport=port))
3679 self.pg0.add_stream(pkts)
3680 self.pg_enable_capture(self.pg_interfaces)
3682 capture = self.pg1.get_capture(len(pkts))
3684 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3685 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3686 UDP(sport=3001, dport=3002))
3687 self.pg0.add_stream(p)
3688 self.pg_enable_capture(self.pg_interfaces)
3690 capture = self.pg1.assert_nothing_captured()
3692 # verify ICMP error packet
3693 capture = self.pg0.get_capture(1)
3695 self.assertTrue(p.haslayer(ICMP))
3697 self.assertEqual(icmp.type, 3)
3698 self.assertEqual(icmp.code, 1)
3699 self.assertTrue(icmp.haslayer(IPerror))
3700 inner_ip = icmp[IPerror]
3701 self.assertEqual(inner_ip[UDPerror].sport, 3001)
3702 self.assertEqual(inner_ip[UDPerror].dport, 3002)
3704 dms = self.vapi.nat_det_map_dump()
3706 self.assertEqual(1000, dms[0].ses_num)
3708 # verify IPFIX logging
3709 self.vapi.cli("ipfix flush") # FIXME this should be an API call
3711 capture = self.pg2.get_capture(2)
3712 ipfix = IPFIXDecoder()
3713 # first load template
3715 self.assertTrue(p.haslayer(IPFIX))
3716 if p.haslayer(Template):
3717 ipfix.add_template(p.getlayer(Template))
3718 # verify events in data set
3720 if p.haslayer(Data):
3721 data = ipfix.decode_data_set(p.getlayer(Set))
3722 self.verify_ipfix_max_entries_per_user(data)
3724 def clear_nat_det(self):
3726 Clear deterministic NAT configuration.
3728 self.vapi.nat_ipfix(enable=0)
3729 self.vapi.nat_det_set_timeouts()
3730 deterministic_mappings = self.vapi.nat_det_map_dump()
3731 for dsm in deterministic_mappings:
3732 self.vapi.nat_det_add_del_map(dsm.in_addr,
3738 interfaces = self.vapi.nat44_interface_dump()
3739 for intf in interfaces:
3740 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
3745 super(TestDeterministicNAT, self).tearDown()
3746 if not self.vpp_dead:
3747 self.logger.info(self.vapi.cli("show nat44 detail"))
3748 self.clear_nat_det()
3751 class TestNAT64(MethodHolder):
3752 """ NAT64 Test Cases """
3755 def setUpClass(cls):
3756 super(TestNAT64, cls).setUpClass()
3759 cls.tcp_port_in = 6303
3760 cls.tcp_port_out = 6303
3761 cls.udp_port_in = 6304
3762 cls.udp_port_out = 6304
3763 cls.icmp_id_in = 6305
3764 cls.icmp_id_out = 6305
3765 cls.nat_addr = '10.0.0.3'
3766 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3768 cls.vrf1_nat_addr = '10.0.10.3'
3769 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
3772 cls.create_pg_interfaces(range(5))
3773 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
3774 cls.ip6_interfaces.append(cls.pg_interfaces[2])
3775 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
3777 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
3779 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
3781 cls.pg0.generate_remote_hosts(2)
3783 for i in cls.ip6_interfaces:
3786 i.configure_ipv6_neighbors()
3788 for i in cls.ip4_interfaces:
3794 cls.pg3.config_ip4()
3795 cls.pg3.resolve_arp()
3796 cls.pg3.config_ip6()
3797 cls.pg3.configure_ipv6_neighbors()
3800 super(TestNAT64, cls).tearDownClass()
3803 def test_pool(self):
3804 """ Add/delete address to NAT64 pool """
3805 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
3807 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
3809 addresses = self.vapi.nat64_pool_addr_dump()
3810 self.assertEqual(len(addresses), 1)
3811 self.assertEqual(addresses[0].address, nat_addr)
3813 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
3815 addresses = self.vapi.nat64_pool_addr_dump()
3816 self.assertEqual(len(addresses), 0)
3818 def test_interface(self):
3819 """ Enable/disable NAT64 feature on the interface """
3820 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3821 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3823 interfaces = self.vapi.nat64_interface_dump()
3824 self.assertEqual(len(interfaces), 2)
3827 for intf in interfaces:
3828 if intf.sw_if_index == self.pg0.sw_if_index:
3829 self.assertEqual(intf.is_inside, 1)
3831 elif intf.sw_if_index == self.pg1.sw_if_index:
3832 self.assertEqual(intf.is_inside, 0)
3834 self.assertTrue(pg0_found)
3835 self.assertTrue(pg1_found)
3837 features = self.vapi.cli("show interface features pg0")
3838 self.assertNotEqual(features.find('nat64-in2out'), -1)
3839 features = self.vapi.cli("show interface features pg1")
3840 self.assertNotEqual(features.find('nat64-out2in'), -1)
3842 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
3843 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
3845 interfaces = self.vapi.nat64_interface_dump()
3846 self.assertEqual(len(interfaces), 0)
3848 def test_static_bib(self):
3849 """ Add/delete static BIB entry """
3850 in_addr = socket.inet_pton(socket.AF_INET6,
3851 '2001:db8:85a3::8a2e:370:7334')
3852 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
3855 proto = IP_PROTOS.tcp
3857 self.vapi.nat64_add_del_static_bib(in_addr,
3862 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3867 self.assertEqual(bibe.i_addr, in_addr)
3868 self.assertEqual(bibe.o_addr, out_addr)
3869 self.assertEqual(bibe.i_port, in_port)
3870 self.assertEqual(bibe.o_port, out_port)
3871 self.assertEqual(static_bib_num, 1)
3873 self.vapi.nat64_add_del_static_bib(in_addr,
3879 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3884 self.assertEqual(static_bib_num, 0)
3886 def test_set_timeouts(self):
3887 """ Set NAT64 timeouts """
3888 # verify default values
3889 timeouts = self.vapi.nat64_get_timeouts()
3890 self.assertEqual(timeouts.udp, 300)
3891 self.assertEqual(timeouts.icmp, 60)
3892 self.assertEqual(timeouts.tcp_trans, 240)
3893 self.assertEqual(timeouts.tcp_est, 7440)
3894 self.assertEqual(timeouts.tcp_incoming_syn, 6)
3896 # set and verify custom values
3897 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
3898 tcp_est=7450, tcp_incoming_syn=10)
3899 timeouts = self.vapi.nat64_get_timeouts()
3900 self.assertEqual(timeouts.udp, 200)
3901 self.assertEqual(timeouts.icmp, 30)
3902 self.assertEqual(timeouts.tcp_trans, 250)
3903 self.assertEqual(timeouts.tcp_est, 7450)
3904 self.assertEqual(timeouts.tcp_incoming_syn, 10)
3906 def test_dynamic(self):
3907 """ NAT64 dynamic translation test """
3908 self.tcp_port_in = 6303
3909 self.udp_port_in = 6304
3910 self.icmp_id_in = 6305
3912 ses_num_start = self.nat64_get_ses_num()
3914 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3916 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3917 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3920 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3921 self.pg0.add_stream(pkts)
3922 self.pg_enable_capture(self.pg_interfaces)
3924 capture = self.pg1.get_capture(len(pkts))
3925 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3926 dst_ip=self.pg1.remote_ip4)
3929 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3930 self.pg1.add_stream(pkts)
3931 self.pg_enable_capture(self.pg_interfaces)
3933 capture = self.pg0.get_capture(len(pkts))
3934 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3935 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3938 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3939 self.pg0.add_stream(pkts)
3940 self.pg_enable_capture(self.pg_interfaces)
3942 capture = self.pg1.get_capture(len(pkts))
3943 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3944 dst_ip=self.pg1.remote_ip4)
3947 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3948 self.pg1.add_stream(pkts)
3949 self.pg_enable_capture(self.pg_interfaces)
3951 capture = self.pg0.get_capture(len(pkts))
3952 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3954 ses_num_end = self.nat64_get_ses_num()
3956 self.assertEqual(ses_num_end - ses_num_start, 3)
3958 # tenant with specific VRF
3959 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3960 self.vrf1_nat_addr_n,
3961 vrf_id=self.vrf1_id)
3962 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3964 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
3965 self.pg2.add_stream(pkts)
3966 self.pg_enable_capture(self.pg_interfaces)
3968 capture = self.pg1.get_capture(len(pkts))
3969 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3970 dst_ip=self.pg1.remote_ip4)
3972 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3973 self.pg1.add_stream(pkts)
3974 self.pg_enable_capture(self.pg_interfaces)
3976 capture = self.pg2.get_capture(len(pkts))
3977 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
3979 def test_static(self):
3980 """ NAT64 static translation test """
3981 self.tcp_port_in = 60303
3982 self.udp_port_in = 60304
3983 self.icmp_id_in = 60305
3984 self.tcp_port_out = 60303
3985 self.udp_port_out = 60304
3986 self.icmp_id_out = 60305
3988 ses_num_start = self.nat64_get_ses_num()
3990 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3992 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3993 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3995 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4000 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4005 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4012 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4013 self.pg0.add_stream(pkts)
4014 self.pg_enable_capture(self.pg_interfaces)
4016 capture = self.pg1.get_capture(len(pkts))
4017 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4018 dst_ip=self.pg1.remote_ip4, same_port=True)
4021 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4022 self.pg1.add_stream(pkts)
4023 self.pg_enable_capture(self.pg_interfaces)
4025 capture = self.pg0.get_capture(len(pkts))
4026 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4027 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4029 ses_num_end = self.nat64_get_ses_num()
4031 self.assertEqual(ses_num_end - ses_num_start, 3)
4033 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4034 def test_session_timeout(self):
4035 """ NAT64 session timeout """
4036 self.icmp_id_in = 1234
4037 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4039 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4040 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4041 self.vapi.nat64_set_timeouts(icmp=5)
4043 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4044 self.pg0.add_stream(pkts)
4045 self.pg_enable_capture(self.pg_interfaces)
4047 capture = self.pg1.get_capture(len(pkts))
4049 ses_num_before_timeout = self.nat64_get_ses_num()
4053 # ICMP session after timeout
4054 ses_num_after_timeout = self.nat64_get_ses_num()
4055 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
4057 def test_icmp_error(self):
4058 """ NAT64 ICMP Error message translation """
4059 self.tcp_port_in = 6303
4060 self.udp_port_in = 6304
4061 self.icmp_id_in = 6305
4063 ses_num_start = self.nat64_get_ses_num()
4065 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4067 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4068 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4070 # send some packets to create sessions
4071 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4072 self.pg0.add_stream(pkts)
4073 self.pg_enable_capture(self.pg_interfaces)
4075 capture_ip4 = self.pg1.get_capture(len(pkts))
4076 self.verify_capture_out(capture_ip4,
4077 nat_ip=self.nat_addr,
4078 dst_ip=self.pg1.remote_ip4)
4080 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4081 self.pg1.add_stream(pkts)
4082 self.pg_enable_capture(self.pg_interfaces)
4084 capture_ip6 = self.pg0.get_capture(len(pkts))
4085 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4086 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
4087 self.pg0.remote_ip6)
4090 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4091 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
4092 ICMPv6DestUnreach(code=1) /
4093 packet[IPv6] for packet in capture_ip6]
4094 self.pg0.add_stream(pkts)
4095 self.pg_enable_capture(self.pg_interfaces)
4097 capture = self.pg1.get_capture(len(pkts))
4098 for packet in capture:
4100 self.assertEqual(packet[IP].src, self.nat_addr)
4101 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
4102 self.assertEqual(packet[ICMP].type, 3)
4103 self.assertEqual(packet[ICMP].code, 13)
4104 inner = packet[IPerror]
4105 self.assertEqual(inner.src, self.pg1.remote_ip4)
4106 self.assertEqual(inner.dst, self.nat_addr)
4107 self.check_icmp_checksum(packet)
4108 if inner.haslayer(TCPerror):
4109 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
4110 elif inner.haslayer(UDPerror):
4111 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
4113 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
4115 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4119 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4120 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4121 ICMP(type=3, code=13) /
4122 packet[IP] for packet in capture_ip4]
4123 self.pg1.add_stream(pkts)
4124 self.pg_enable_capture(self.pg_interfaces)
4126 capture = self.pg0.get_capture(len(pkts))
4127 for packet in capture:
4129 self.assertEqual(packet[IPv6].src, ip.src)
4130 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
4131 icmp = packet[ICMPv6DestUnreach]
4132 self.assertEqual(icmp.code, 1)
4133 inner = icmp[IPerror6]
4134 self.assertEqual(inner.src, self.pg0.remote_ip6)
4135 self.assertEqual(inner.dst, ip.src)
4136 self.check_icmpv6_checksum(packet)
4137 if inner.haslayer(TCPerror):
4138 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
4139 elif inner.haslayer(UDPerror):
4140 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
4142 self.assertEqual(inner[ICMPv6EchoRequest].id,
4145 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4148 def test_hairpinning(self):
4149 """ NAT64 hairpinning """
4151 client = self.pg0.remote_hosts[0]
4152 server = self.pg0.remote_hosts[1]
4153 server_tcp_in_port = 22
4154 server_tcp_out_port = 4022
4155 server_udp_in_port = 23
4156 server_udp_out_port = 4023
4157 client_tcp_in_port = 1234
4158 client_udp_in_port = 1235
4159 client_tcp_out_port = 0
4160 client_udp_out_port = 0
4161 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
4162 nat_addr_ip6 = ip.src
4164 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4166 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4167 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4169 self.vapi.nat64_add_del_static_bib(server.ip6n,
4172 server_tcp_out_port,
4174 self.vapi.nat64_add_del_static_bib(server.ip6n,
4177 server_udp_out_port,
4182 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4183 IPv6(src=client.ip6, dst=nat_addr_ip6) /
4184 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
4186 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4187 IPv6(src=client.ip6, dst=nat_addr_ip6) /
4188 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
4190 self.pg0.add_stream(pkts)
4191 self.pg_enable_capture(self.pg_interfaces)
4193 capture = self.pg0.get_capture(len(pkts))
4194 for packet in capture:
4196 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4197 self.assertEqual(packet[IPv6].dst, server.ip6)
4198 if packet.haslayer(TCP):
4199 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
4200 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
4201 self.check_tcp_checksum(packet)
4202 client_tcp_out_port = packet[TCP].sport
4204 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
4205 self.assertEqual(packet[UDP].dport, server_udp_in_port)
4206 self.check_udp_checksum(packet)
4207 client_udp_out_port = packet[UDP].sport
4209 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4214 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4215 IPv6(src=server.ip6, dst=nat_addr_ip6) /
4216 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
4218 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4219 IPv6(src=server.ip6, dst=nat_addr_ip6) /
4220 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
4222 self.pg0.add_stream(pkts)
4223 self.pg_enable_capture(self.pg_interfaces)
4225 capture = self.pg0.get_capture(len(pkts))
4226 for packet in capture:
4228 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4229 self.assertEqual(packet[IPv6].dst, client.ip6)
4230 if packet.haslayer(TCP):
4231 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
4232 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
4233 self.check_tcp_checksum(packet)
4235 self.assertEqual(packet[UDP].sport, server_udp_out_port)
4236 self.assertEqual(packet[UDP].dport, client_udp_in_port)
4237 self.check_udp_checksum(packet)
4239 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4244 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4245 IPv6(src=client.ip6, dst=nat_addr_ip6) /
4246 ICMPv6DestUnreach(code=1) /
4247 packet[IPv6] for packet in capture]
4248 self.pg0.add_stream(pkts)
4249 self.pg_enable_capture(self.pg_interfaces)
4251 capture = self.pg0.get_capture(len(pkts))
4252 for packet in capture:
4254 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4255 self.assertEqual(packet[IPv6].dst, server.ip6)
4256 icmp = packet[ICMPv6DestUnreach]
4257 self.assertEqual(icmp.code, 1)
4258 inner = icmp[IPerror6]
4259 self.assertEqual(inner.src, server.ip6)
4260 self.assertEqual(inner.dst, nat_addr_ip6)
4261 self.check_icmpv6_checksum(packet)
4262 if inner.haslayer(TCPerror):
4263 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
4264 self.assertEqual(inner[TCPerror].dport,
4265 client_tcp_out_port)
4267 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
4268 self.assertEqual(inner[UDPerror].dport,
4269 client_udp_out_port)
4271 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4274 def test_prefix(self):
4275 """ NAT64 Network-Specific Prefix """
4277 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4279 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4280 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4281 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
4282 self.vrf1_nat_addr_n,
4283 vrf_id=self.vrf1_id)
4284 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
4287 global_pref64 = "2001:db8::"
4288 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
4289 global_pref64_len = 32
4290 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
4292 prefix = self.vapi.nat64_prefix_dump()
4293 self.assertEqual(len(prefix), 1)
4294 self.assertEqual(prefix[0].prefix, global_pref64_n)
4295 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
4296 self.assertEqual(prefix[0].vrf_id, 0)
4298 # Add tenant specific prefix
4299 vrf1_pref64 = "2001:db8:122:300::"
4300 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
4301 vrf1_pref64_len = 56
4302 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
4304 vrf_id=self.vrf1_id)
4305 prefix = self.vapi.nat64_prefix_dump()
4306 self.assertEqual(len(prefix), 2)
4309 pkts = self.create_stream_in_ip6(self.pg0,
4312 plen=global_pref64_len)
4313 self.pg0.add_stream(pkts)
4314 self.pg_enable_capture(self.pg_interfaces)
4316 capture = self.pg1.get_capture(len(pkts))
4317 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4318 dst_ip=self.pg1.remote_ip4)
4320 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4321 self.pg1.add_stream(pkts)
4322 self.pg_enable_capture(self.pg_interfaces)
4324 capture = self.pg0.get_capture(len(pkts))
4325 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
4328 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
4330 # Tenant specific prefix
4331 pkts = self.create_stream_in_ip6(self.pg2,
4334 plen=vrf1_pref64_len)
4335 self.pg2.add_stream(pkts)
4336 self.pg_enable_capture(self.pg_interfaces)
4338 capture = self.pg1.get_capture(len(pkts))
4339 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
4340 dst_ip=self.pg1.remote_ip4)
4342 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
4343 self.pg1.add_stream(pkts)
4344 self.pg_enable_capture(self.pg_interfaces)
4346 capture = self.pg2.get_capture(len(pkts))
4347 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
4350 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
4352 def test_unknown_proto(self):
4353 """ NAT64 translate packet with unknown protocol """
4355 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4357 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4358 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4359 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4362 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4363 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
4364 TCP(sport=self.tcp_port_in, dport=20))
4365 self.pg0.add_stream(p)
4366 self.pg_enable_capture(self.pg_interfaces)
4368 p = self.pg1.get_capture(1)
4370 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4371 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
4373 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
4374 TCP(sport=1234, dport=1234))
4375 self.pg0.add_stream(p)
4376 self.pg_enable_capture(self.pg_interfaces)
4378 p = self.pg1.get_capture(1)
4381 self.assertEqual(packet[IP].src, self.nat_addr)
4382 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
4383 self.assertTrue(packet.haslayer(GRE))
4384 self.check_ip_checksum(packet)
4386 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4390 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4391 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4393 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
4394 TCP(sport=1234, dport=1234))
4395 self.pg1.add_stream(p)
4396 self.pg_enable_capture(self.pg_interfaces)
4398 p = self.pg0.get_capture(1)
4401 self.assertEqual(packet[IPv6].src, remote_ip6)
4402 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
4403 self.assertEqual(packet[IPv6].nh, 47)
4405 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4408 def test_hairpinning_unknown_proto(self):
4409 """ NAT64 translate packet with unknown protocol - hairpinning """
4411 client = self.pg0.remote_hosts[0]
4412 server = self.pg0.remote_hosts[1]
4413 server_tcp_in_port = 22
4414 server_tcp_out_port = 4022
4415 client_tcp_in_port = 1234
4416 client_tcp_out_port = 1235
4417 server_nat_ip = "10.0.0.100"
4418 client_nat_ip = "10.0.0.110"
4419 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
4420 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
4421 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
4422 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
4424 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
4426 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4427 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4429 self.vapi.nat64_add_del_static_bib(server.ip6n,
4432 server_tcp_out_port,
4435 self.vapi.nat64_add_del_static_bib(server.ip6n,
4441 self.vapi.nat64_add_del_static_bib(client.ip6n,
4444 client_tcp_out_port,
4448 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4449 IPv6(src=client.ip6, dst=server_nat_ip6) /
4450 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
4451 self.pg0.add_stream(p)
4452 self.pg_enable_capture(self.pg_interfaces)
4454 p = self.pg0.get_capture(1)
4456 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4457 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
4459 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
4460 TCP(sport=1234, dport=1234))
4461 self.pg0.add_stream(p)
4462 self.pg_enable_capture(self.pg_interfaces)
4464 p = self.pg0.get_capture(1)
4467 self.assertEqual(packet[IPv6].src, client_nat_ip6)
4468 self.assertEqual(packet[IPv6].dst, server.ip6)
4469 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
4471 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4475 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4476 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
4478 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
4479 TCP(sport=1234, dport=1234))
4480 self.pg0.add_stream(p)
4481 self.pg_enable_capture(self.pg_interfaces)
4483 p = self.pg0.get_capture(1)
4486 self.assertEqual(packet[IPv6].src, server_nat_ip6)
4487 self.assertEqual(packet[IPv6].dst, client.ip6)
4488 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
4490 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4493 def test_one_armed_nat64(self):
4494 """ One armed NAT64 """
4496 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
4500 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4502 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
4503 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
4506 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
4507 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
4508 TCP(sport=12345, dport=80))
4509 self.pg3.add_stream(p)
4510 self.pg_enable_capture(self.pg_interfaces)
4512 capture = self.pg3.get_capture(1)
4517 self.assertEqual(ip.src, self.nat_addr)
4518 self.assertEqual(ip.dst, self.pg3.remote_ip4)
4519 self.assertNotEqual(tcp.sport, 12345)
4520 external_port = tcp.sport
4521 self.assertEqual(tcp.dport, 80)
4522 self.check_tcp_checksum(p)
4523 self.check_ip_checksum(p)
4525 self.logger.error(ppp("Unexpected or invalid packet:", p))
4529 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
4530 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
4531 TCP(sport=80, dport=external_port))
4532 self.pg3.add_stream(p)
4533 self.pg_enable_capture(self.pg_interfaces)
4535 capture = self.pg3.get_capture(1)
4540 self.assertEqual(ip.src, remote_host_ip6)
4541 self.assertEqual(ip.dst, self.pg3.remote_ip6)
4542 self.assertEqual(tcp.sport, 80)
4543 self.assertEqual(tcp.dport, 12345)
4544 self.check_tcp_checksum(p)
4546 self.logger.error(ppp("Unexpected or invalid packet:", p))
4549 def test_frag_in_order(self):
4550 """ NAT64 translate fragments arriving in order """
4551 self.tcp_port_in = random.randint(1025, 65535)
4553 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4555 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4556 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4558 reass = self.vapi.nat_reass_dump()
4559 reass_n_start = len(reass)
4563 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
4564 self.tcp_port_in, 20, data)
4565 self.pg0.add_stream(pkts)
4566 self.pg_enable_capture(self.pg_interfaces)
4568 frags = self.pg1.get_capture(len(pkts))
4569 p = self.reass_frags_and_verify(frags,
4571 self.pg1.remote_ip4)
4572 self.assertEqual(p[TCP].dport, 20)
4573 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
4574 self.tcp_port_out = p[TCP].sport
4575 self.assertEqual(data, p[Raw].load)
4578 data = "A" * 4 + "b" * 16 + "C" * 3
4579 pkts = self.create_stream_frag(self.pg1,
4584 self.pg1.add_stream(pkts)
4585 self.pg_enable_capture(self.pg_interfaces)
4587 frags = self.pg0.get_capture(len(pkts))
4588 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4589 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
4590 self.assertEqual(p[TCP].sport, 20)
4591 self.assertEqual(p[TCP].dport, self.tcp_port_in)
4592 self.assertEqual(data, p[Raw].load)
4594 reass = self.vapi.nat_reass_dump()
4595 reass_n_end = len(reass)
4597 self.assertEqual(reass_n_end - reass_n_start, 2)
4599 def test_reass_hairpinning(self):
4600 """ NAT64 fragments hairpinning """
4602 client = self.pg0.remote_hosts[0]
4603 server = self.pg0.remote_hosts[1]
4604 server_in_port = random.randint(1025, 65535)
4605 server_out_port = random.randint(1025, 65535)
4606 client_in_port = random.randint(1025, 65535)
4607 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
4608 nat_addr_ip6 = ip.src
4610 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4612 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4613 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4615 # add static BIB entry for server
4616 self.vapi.nat64_add_del_static_bib(server.ip6n,
4622 # send packet from host to server
4623 pkts = self.create_stream_frag_ip6(self.pg0,
4628 self.pg0.add_stream(pkts)
4629 self.pg_enable_capture(self.pg_interfaces)
4631 frags = self.pg0.get_capture(len(pkts))
4632 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
4633 self.assertNotEqual(p[TCP].sport, client_in_port)
4634 self.assertEqual(p[TCP].dport, server_in_port)
4635 self.assertEqual(data, p[Raw].load)
4637 def test_frag_out_of_order(self):
4638 """ NAT64 translate fragments arriving out of order """
4639 self.tcp_port_in = random.randint(1025, 65535)
4641 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4643 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4644 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4648 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
4649 self.tcp_port_in, 20, data)
4651 self.pg0.add_stream(pkts)
4652 self.pg_enable_capture(self.pg_interfaces)
4654 frags = self.pg1.get_capture(len(pkts))
4655 p = self.reass_frags_and_verify(frags,
4657 self.pg1.remote_ip4)
4658 self.assertEqual(p[TCP].dport, 20)
4659 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
4660 self.tcp_port_out = p[TCP].sport
4661 self.assertEqual(data, p[Raw].load)
4664 data = "A" * 4 + "B" * 16 + "C" * 3
4665 pkts = self.create_stream_frag(self.pg1,
4671 self.pg1.add_stream(pkts)
4672 self.pg_enable_capture(self.pg_interfaces)
4674 frags = self.pg0.get_capture(len(pkts))
4675 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4676 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
4677 self.assertEqual(p[TCP].sport, 20)
4678 self.assertEqual(p[TCP].dport, self.tcp_port_in)
4679 self.assertEqual(data, p[Raw].load)
4681 def test_interface_addr(self):
4682 """ Acquire NAT64 pool addresses from interface """
4683 self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
4685 # no address in NAT64 pool
4686 adresses = self.vapi.nat44_address_dump()
4687 self.assertEqual(0, len(adresses))
4689 # configure interface address and check NAT64 address pool
4690 self.pg4.config_ip4()
4691 addresses = self.vapi.nat64_pool_addr_dump()
4692 self.assertEqual(len(addresses), 1)
4693 self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
4695 # remove interface address and check NAT64 address pool
4696 self.pg4.unconfig_ip4()
4697 addresses = self.vapi.nat64_pool_addr_dump()
4698 self.assertEqual(0, len(adresses))
4700 def nat64_get_ses_num(self):
4702 Return number of active NAT64 sessions.
4704 st = self.vapi.nat64_st_dump()
4707 def clear_nat64(self):
4709 Clear NAT64 configuration.
4711 self.vapi.nat64_set_timeouts()
4713 interfaces = self.vapi.nat64_interface_dump()
4714 for intf in interfaces:
4715 if intf.is_inside > 1:
4716 self.vapi.nat64_add_del_interface(intf.sw_if_index,
4719 self.vapi.nat64_add_del_interface(intf.sw_if_index,
4723 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4726 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
4734 bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
4737 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
4745 bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
4748 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
4756 adresses = self.vapi.nat64_pool_addr_dump()
4757 for addr in adresses:
4758 self.vapi.nat64_add_del_pool_addr_range(addr.address,
4763 prefixes = self.vapi.nat64_prefix_dump()
4764 for prefix in prefixes:
4765 self.vapi.nat64_add_del_prefix(prefix.prefix,
4767 vrf_id=prefix.vrf_id,
4771 super(TestNAT64, self).tearDown()
4772 if not self.vpp_dead:
4773 self.logger.info(self.vapi.cli("show nat64 pool"))
4774 self.logger.info(self.vapi.cli("show nat64 interfaces"))
4775 self.logger.info(self.vapi.cli("show nat64 prefix"))
4776 self.logger.info(self.vapi.cli("show nat64 bib all"))
4777 self.logger.info(self.vapi.cli("show nat64 session table all"))
4778 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
4782 class TestDSlite(MethodHolder):
4783 """ DS-Lite Test Cases """
4786 def setUpClass(cls):
4787 super(TestDSlite, cls).setUpClass()
4790 cls.nat_addr = '10.0.0.3'
4791 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4793 cls.create_pg_interfaces(range(2))
4795 cls.pg0.config_ip4()
4796 cls.pg0.resolve_arp()
4798 cls.pg1.config_ip6()
4799 cls.pg1.generate_remote_hosts(2)
4800 cls.pg1.configure_ipv6_neighbors()
4803 super(TestDSlite, cls).tearDownClass()
4806 def test_dslite(self):
4807 """ Test DS-Lite """
4808 self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
4810 aftr_ip4 = '192.0.0.1'
4811 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
4812 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
4813 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
4814 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
4817 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4818 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
4819 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
4820 UDP(sport=20000, dport=10000))
4821 self.pg1.add_stream(p)
4822 self.pg_enable_capture(self.pg_interfaces)
4824 capture = self.pg0.get_capture(1)
4825 capture = capture[0]
4826 self.assertFalse(capture.haslayer(IPv6))
4827 self.assertEqual(capture[IP].src, self.nat_addr)
4828 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
4829 self.assertNotEqual(capture[UDP].sport, 20000)
4830 self.assertEqual(capture[UDP].dport, 10000)
4831 self.check_ip_checksum(capture)
4832 out_port = capture[UDP].sport
4834 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4835 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
4836 UDP(sport=10000, dport=out_port))
4837 self.pg0.add_stream(p)
4838 self.pg_enable_capture(self.pg_interfaces)
4840 capture = self.pg1.get_capture(1)
4841 capture = capture[0]
4842 self.assertEqual(capture[IPv6].src, aftr_ip6)
4843 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
4844 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
4845 self.assertEqual(capture[IP].dst, '192.168.1.1')
4846 self.assertEqual(capture[UDP].sport, 10000)
4847 self.assertEqual(capture[UDP].dport, 20000)
4848 self.check_ip_checksum(capture)
4851 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4852 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
4853 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
4854 TCP(sport=20001, dport=10001))
4855 self.pg1.add_stream(p)
4856 self.pg_enable_capture(self.pg_interfaces)
4858 capture = self.pg0.get_capture(1)
4859 capture = capture[0]
4860 self.assertFalse(capture.haslayer(IPv6))
4861 self.assertEqual(capture[IP].src, self.nat_addr)
4862 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
4863 self.assertNotEqual(capture[TCP].sport, 20001)
4864 self.assertEqual(capture[TCP].dport, 10001)
4865 self.check_ip_checksum(capture)
4866 self.check_tcp_checksum(capture)
4867 out_port = capture[TCP].sport
4869 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4870 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
4871 TCP(sport=10001, dport=out_port))
4872 self.pg0.add_stream(p)
4873 self.pg_enable_capture(self.pg_interfaces)
4875 capture = self.pg1.get_capture(1)
4876 capture = capture[0]
4877 self.assertEqual(capture[IPv6].src, aftr_ip6)
4878 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
4879 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
4880 self.assertEqual(capture[IP].dst, '192.168.1.1')
4881 self.assertEqual(capture[TCP].sport, 10001)
4882 self.assertEqual(capture[TCP].dport, 20001)
4883 self.check_ip_checksum(capture)
4884 self.check_tcp_checksum(capture)
4887 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4888 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
4889 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
4890 ICMP(id=4000, type='echo-request'))
4891 self.pg1.add_stream(p)
4892 self.pg_enable_capture(self.pg_interfaces)
4894 capture = self.pg0.get_capture(1)
4895 capture = capture[0]
4896 self.assertFalse(capture.haslayer(IPv6))
4897 self.assertEqual(capture[IP].src, self.nat_addr)
4898 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
4899 self.assertNotEqual(capture[ICMP].id, 4000)
4900 self.check_ip_checksum(capture)
4901 self.check_icmp_checksum(capture)
4902 out_id = capture[ICMP].id
4904 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4905 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
4906 ICMP(id=out_id, type='echo-reply'))
4907 self.pg0.add_stream(p)
4908 self.pg_enable_capture(self.pg_interfaces)
4910 capture = self.pg1.get_capture(1)
4911 capture = capture[0]
4912 self.assertEqual(capture[IPv6].src, aftr_ip6)
4913 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
4914 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
4915 self.assertEqual(capture[IP].dst, '192.168.1.1')
4916 self.assertEqual(capture[ICMP].id, 4000)
4917 self.check_ip_checksum(capture)
4918 self.check_icmp_checksum(capture)
4920 # ping DS-Lite AFTR tunnel endpoint address
4921 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4922 IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
4923 ICMPv6EchoRequest())
4924 self.pg1.add_stream(p)
4925 self.pg_enable_capture(self.pg_interfaces)
4927 capture = self.pg1.get_capture(1)
4928 self.assertEqual(1, len(capture))
4929 capture = capture[0]
4930 self.assertEqual(capture[IPv6].src, aftr_ip6)
4931 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
4932 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
4935 super(TestDSlite, self).tearDown()
4936 if not self.vpp_dead:
4937 self.logger.info(self.vapi.cli("show dslite pool"))
4939 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
4940 self.logger.info(self.vapi.cli("show dslite sessions"))
4942 if __name__ == '__main__':
4943 unittest.main(testRunner=VppTestRunner)