9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
26 class MethodHolder(VppTestCase):
27 """ NAT create capture and verify method holder """
31 super(MethodHolder, cls).setUpClass()
34 super(MethodHolder, self).tearDown()
36 def check_ip_checksum(self, pkt):
38 Check IP checksum of the packet
40 :param pkt: Packet to check IP checksum
42 new = pkt.__class__(str(pkt))
44 new = new.__class__(str(new))
45 self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
47 def check_tcp_checksum(self, pkt):
49 Check TCP checksum in IP packet
51 :param pkt: Packet to check TCP checksum
53 new = pkt.__class__(str(pkt))
55 new = new.__class__(str(new))
56 self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
58 def check_udp_checksum(self, pkt):
60 Check UDP checksum in IP packet
62 :param pkt: Packet to check UDP checksum
64 new = pkt.__class__(str(pkt))
66 new = new.__class__(str(new))
67 self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
69 def check_icmp_errror_embedded(self, pkt):
71 Check ICMP error embeded packet checksum
73 :param pkt: Packet to check ICMP error embeded packet checksum
75 if pkt.haslayer(IPerror):
76 new = pkt.__class__(str(pkt))
77 del new['IPerror'].chksum
78 new = new.__class__(str(new))
79 self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
81 if pkt.haslayer(TCPerror):
82 new = pkt.__class__(str(pkt))
83 del new['TCPerror'].chksum
84 new = new.__class__(str(new))
85 self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
87 if pkt.haslayer(UDPerror):
88 if pkt['UDPerror'].chksum != 0:
89 new = pkt.__class__(str(pkt))
90 del new['UDPerror'].chksum
91 new = new.__class__(str(new))
92 self.assertEqual(new['UDPerror'].chksum,
93 pkt['UDPerror'].chksum)
95 if pkt.haslayer(ICMPerror):
96 del new['ICMPerror'].chksum
97 new = new.__class__(str(new))
98 self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
100 def check_icmp_checksum(self, pkt):
102 Check ICMP checksum in IPv4 packet
104 :param pkt: Packet to check ICMP checksum
106 new = pkt.__class__(str(pkt))
107 del new['ICMP'].chksum
108 new = new.__class__(str(new))
109 self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
110 if pkt.haslayer(IPerror):
111 self.check_icmp_errror_embedded(pkt)
113 def check_icmpv6_checksum(self, pkt):
115 Check ICMPv6 checksum in IPv4 packet
117 :param pkt: Packet to check ICMPv6 checksum
119 new = pkt.__class__(str(pkt))
120 if pkt.haslayer(ICMPv6DestUnreach):
121 del new['ICMPv6DestUnreach'].cksum
122 new = new.__class__(str(new))
123 self.assertEqual(new['ICMPv6DestUnreach'].cksum,
124 pkt['ICMPv6DestUnreach'].cksum)
125 self.check_icmp_errror_embedded(pkt)
126 if pkt.haslayer(ICMPv6EchoRequest):
127 del new['ICMPv6EchoRequest'].cksum
128 new = new.__class__(str(new))
129 self.assertEqual(new['ICMPv6EchoRequest'].cksum,
130 pkt['ICMPv6EchoRequest'].cksum)
131 if pkt.haslayer(ICMPv6EchoReply):
132 del new['ICMPv6EchoReply'].cksum
133 new = new.__class__(str(new))
134 self.assertEqual(new['ICMPv6EchoReply'].cksum,
135 pkt['ICMPv6EchoReply'].cksum)
137 def create_stream_in(self, in_if, out_if, ttl=64):
139 Create packet stream for inside network
141 :param in_if: Inside interface
142 :param out_if: Outside interface
143 :param ttl: TTL of generated packets
147 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
148 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
149 TCP(sport=self.tcp_port_in, dport=20))
153 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
154 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
155 UDP(sport=self.udp_port_in, dport=20))
159 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
160 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
161 ICMP(id=self.icmp_id_in, type='echo-request'))
166 def compose_ip6(self, ip4, pref, plen):
168 Compose IPv4-embedded IPv6 addresses
170 :param ip4: IPv4 address
171 :param pref: IPv6 prefix
172 :param plen: IPv6 prefix length
173 :returns: IPv4-embedded IPv6 addresses
175 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
176 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
191 pref_n[10] = ip4_n[3]
195 pref_n[10] = ip4_n[2]
196 pref_n[11] = ip4_n[3]
199 pref_n[10] = ip4_n[1]
200 pref_n[11] = ip4_n[2]
201 pref_n[12] = ip4_n[3]
203 pref_n[12] = ip4_n[0]
204 pref_n[13] = ip4_n[1]
205 pref_n[14] = ip4_n[2]
206 pref_n[15] = ip4_n[3]
207 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
209 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
211 Create IPv6 packet stream for inside network
213 :param in_if: Inside interface
214 :param out_if: Outside interface
215 :param ttl: Hop Limit of generated packets
216 :param pref: NAT64 prefix
217 :param plen: NAT64 prefix length
221 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
223 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
226 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
227 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
228 TCP(sport=self.tcp_port_in, dport=20))
232 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
233 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
234 UDP(sport=self.udp_port_in, dport=20))
238 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
239 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
240 ICMPv6EchoRequest(id=self.icmp_id_in))
245 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
247 Create packet stream for outside network
249 :param out_if: Outside interface
250 :param dst_ip: Destination IP address (Default use global NAT address)
251 :param ttl: TTL of generated packets
254 dst_ip = self.nat_addr
257 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
258 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
259 TCP(dport=self.tcp_port_out, sport=20))
263 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
264 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
265 UDP(dport=self.udp_port_out, sport=20))
269 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
270 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
271 ICMP(id=self.icmp_id_out, type='echo-reply'))
276 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
277 packet_num=3, dst_ip=None):
279 Verify captured packets on outside network
281 :param capture: Captured packets
282 :param nat_ip: Translated IP address (Default use global NAT address)
283 :param same_port: Sorce port number is not translated (Default False)
284 :param packet_num: Expected number of packets (Default 3)
285 :param dst_ip: Destination IP address (Default do not verify)
288 nat_ip = self.nat_addr
289 self.assertEqual(packet_num, len(capture))
290 for packet in capture:
292 self.check_ip_checksum(packet)
293 self.assertEqual(packet[IP].src, nat_ip)
294 if dst_ip is not None:
295 self.assertEqual(packet[IP].dst, dst_ip)
296 if packet.haslayer(TCP):
298 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
301 packet[TCP].sport, self.tcp_port_in)
302 self.tcp_port_out = packet[TCP].sport
303 self.check_tcp_checksum(packet)
304 elif packet.haslayer(UDP):
306 self.assertEqual(packet[UDP].sport, self.udp_port_in)
309 packet[UDP].sport, self.udp_port_in)
310 self.udp_port_out = packet[UDP].sport
313 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
315 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
316 self.icmp_id_out = packet[ICMP].id
317 self.check_icmp_checksum(packet)
319 self.logger.error(ppp("Unexpected or invalid packet "
320 "(outside network):", packet))
323 def verify_capture_in(self, capture, in_if, packet_num=3):
325 Verify captured packets on inside network
327 :param capture: Captured packets
328 :param in_if: Inside interface
329 :param packet_num: Expected number of packets (Default 3)
331 self.assertEqual(packet_num, len(capture))
332 for packet in capture:
334 self.check_ip_checksum(packet)
335 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
336 if packet.haslayer(TCP):
337 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
338 self.check_tcp_checksum(packet)
339 elif packet.haslayer(UDP):
340 self.assertEqual(packet[UDP].dport, self.udp_port_in)
342 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
343 self.check_icmp_checksum(packet)
345 self.logger.error(ppp("Unexpected or invalid packet "
346 "(inside network):", packet))
349 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
351 Verify captured IPv6 packets on inside network
353 :param capture: Captured packets
354 :param src_ip: Source IP
355 :param dst_ip: Destination IP address
356 :param packet_num: Expected number of packets (Default 3)
358 self.assertEqual(packet_num, len(capture))
359 for packet in capture:
361 self.assertEqual(packet[IPv6].src, src_ip)
362 self.assertEqual(packet[IPv6].dst, dst_ip)
363 if packet.haslayer(TCP):
364 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
365 self.check_tcp_checksum(packet)
366 elif packet.haslayer(UDP):
367 self.assertEqual(packet[UDP].dport, self.udp_port_in)
368 self.check_udp_checksum(packet)
370 self.assertEqual(packet[ICMPv6EchoReply].id,
372 self.check_icmpv6_checksum(packet)
374 self.logger.error(ppp("Unexpected or invalid packet "
375 "(inside network):", packet))
378 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
380 Verify captured packet that don't have to be translated
382 :param capture: Captured packets
383 :param ingress_if: Ingress interface
384 :param egress_if: Egress interface
386 for packet in capture:
388 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
389 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
390 if packet.haslayer(TCP):
391 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
392 elif packet.haslayer(UDP):
393 self.assertEqual(packet[UDP].sport, self.udp_port_in)
395 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
397 self.logger.error(ppp("Unexpected or invalid packet "
398 "(inside network):", packet))
401 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
402 packet_num=3, icmp_type=11):
404 Verify captured packets with ICMP errors on outside network
406 :param capture: Captured packets
407 :param src_ip: Translated IP address or IP address of VPP
408 (Default use global NAT address)
409 :param packet_num: Expected number of packets (Default 3)
410 :param icmp_type: Type of error ICMP packet
411 we are expecting (Default 11)
414 src_ip = self.nat_addr
415 self.assertEqual(packet_num, len(capture))
416 for packet in capture:
418 self.assertEqual(packet[IP].src, src_ip)
419 self.assertTrue(packet.haslayer(ICMP))
421 self.assertEqual(icmp.type, icmp_type)
422 self.assertTrue(icmp.haslayer(IPerror))
423 inner_ip = icmp[IPerror]
424 if inner_ip.haslayer(TCPerror):
425 self.assertEqual(inner_ip[TCPerror].dport,
427 elif inner_ip.haslayer(UDPerror):
428 self.assertEqual(inner_ip[UDPerror].dport,
431 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
433 self.logger.error(ppp("Unexpected or invalid packet "
434 "(outside network):", packet))
437 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
440 Verify captured packets with ICMP errors on inside network
442 :param capture: Captured packets
443 :param in_if: Inside interface
444 :param packet_num: Expected number of packets (Default 3)
445 :param icmp_type: Type of error ICMP packet
446 we are expecting (Default 11)
448 self.assertEqual(packet_num, len(capture))
449 for packet in capture:
451 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
452 self.assertTrue(packet.haslayer(ICMP))
454 self.assertEqual(icmp.type, icmp_type)
455 self.assertTrue(icmp.haslayer(IPerror))
456 inner_ip = icmp[IPerror]
457 if inner_ip.haslayer(TCPerror):
458 self.assertEqual(inner_ip[TCPerror].sport,
460 elif inner_ip.haslayer(UDPerror):
461 self.assertEqual(inner_ip[UDPerror].sport,
464 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
466 self.logger.error(ppp("Unexpected or invalid packet "
467 "(inside network):", packet))
470 def create_stream_frag(self, src_if, dst, sport, dport, data):
472 Create fragmented packet stream
474 :param src_if: Source interface
475 :param dst: Destination IPv4 address
476 :param sport: Source TCP port
477 :param dport: Destination TCP port
478 :param data: Payload data
481 id = random.randint(0, 65535)
482 p = (IP(src=src_if.remote_ip4, dst=dst) /
483 TCP(sport=sport, dport=dport) /
485 p = p.__class__(str(p))
486 chksum = p['TCP'].chksum
488 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
489 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
490 TCP(sport=sport, dport=dport, chksum=chksum) /
493 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
494 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
495 proto=IP_PROTOS.tcp) /
498 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
499 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
505 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
506 pref=None, plen=0, frag_size=128):
508 Create fragmented packet stream
510 :param src_if: Source interface
511 :param dst: Destination IPv4 address
512 :param sport: Source TCP port
513 :param dport: Destination TCP port
514 :param data: Payload data
515 :param pref: NAT64 prefix
516 :param plen: NAT64 prefix length
517 :param fragsize: size of fragments
521 dst_ip6 = ''.join(['64:ff9b::', dst])
523 dst_ip6 = self.compose_ip6(dst, pref, plen)
525 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
526 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
527 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
528 TCP(sport=sport, dport=dport) /
531 return fragment6(p, frag_size)
533 def reass_frags_and_verify(self, frags, src, dst):
535 Reassemble and verify fragmented packet
537 :param frags: Captured fragments
538 :param src: Source IPv4 address to verify
539 :param dst: Destination IPv4 address to verify
541 :returns: Reassembled IPv4 packet
543 buffer = StringIO.StringIO()
545 self.assertEqual(p[IP].src, src)
546 self.assertEqual(p[IP].dst, dst)
547 self.check_ip_checksum(p)
548 buffer.seek(p[IP].frag * 8)
549 buffer.write(p[IP].payload)
550 ip = frags[0].getlayer(IP)
551 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
552 proto=frags[0][IP].proto)
553 if ip.proto == IP_PROTOS.tcp:
554 p = (ip / TCP(buffer.getvalue()))
555 self.check_tcp_checksum(p)
556 elif ip.proto == IP_PROTOS.udp:
557 p = (ip / UDP(buffer.getvalue()))
560 def reass_frags_and_verify_ip6(self, frags, src, dst):
562 Reassemble and verify fragmented packet
564 :param frags: Captured fragments
565 :param src: Source IPv6 address to verify
566 :param dst: Destination IPv6 address to verify
568 :returns: Reassembled IPv6 packet
570 buffer = StringIO.StringIO()
572 self.assertEqual(p[IPv6].src, src)
573 self.assertEqual(p[IPv6].dst, dst)
574 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
575 buffer.write(p[IPv6ExtHdrFragment].payload)
576 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
577 nh=frags[0][IPv6ExtHdrFragment].nh)
578 if ip.nh == IP_PROTOS.tcp:
579 p = (ip / TCP(buffer.getvalue()))
580 self.check_tcp_checksum(p)
581 elif ip.nh == IP_PROTOS.udp:
582 p = (ip / UDP(buffer.getvalue()))
585 def verify_ipfix_nat44_ses(self, data):
587 Verify IPFIX NAT44 session create/delete event
589 :param data: Decoded IPFIX data records
591 nat44_ses_create_num = 0
592 nat44_ses_delete_num = 0
593 self.assertEqual(6, len(data))
596 self.assertIn(ord(record[230]), [4, 5])
597 if ord(record[230]) == 4:
598 nat44_ses_create_num += 1
600 nat44_ses_delete_num += 1
602 self.assertEqual(self.pg0.remote_ip4n, record[8])
603 # postNATSourceIPv4Address
604 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
607 self.assertEqual(struct.pack("!I", 0), record[234])
608 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
609 if IP_PROTOS.icmp == ord(record[4]):
610 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
611 self.assertEqual(struct.pack("!H", self.icmp_id_out),
613 elif IP_PROTOS.tcp == ord(record[4]):
614 self.assertEqual(struct.pack("!H", self.tcp_port_in),
616 self.assertEqual(struct.pack("!H", self.tcp_port_out),
618 elif IP_PROTOS.udp == ord(record[4]):
619 self.assertEqual(struct.pack("!H", self.udp_port_in),
621 self.assertEqual(struct.pack("!H", self.udp_port_out),
624 self.fail("Invalid protocol")
625 self.assertEqual(3, nat44_ses_create_num)
626 self.assertEqual(3, nat44_ses_delete_num)
628 def verify_ipfix_addr_exhausted(self, data):
630 Verify IPFIX NAT addresses event
632 :param data: Decoded IPFIX data records
634 self.assertEqual(1, len(data))
637 self.assertEqual(ord(record[230]), 3)
639 self.assertEqual(struct.pack("!I", 0), record[283])
642 class TestNAT44(MethodHolder):
643 """ NAT44 Test Cases """
647 super(TestNAT44, cls).setUpClass()
650 cls.tcp_port_in = 6303
651 cls.tcp_port_out = 6303
652 cls.udp_port_in = 6304
653 cls.udp_port_out = 6304
654 cls.icmp_id_in = 6305
655 cls.icmp_id_out = 6305
656 cls.nat_addr = '10.0.0.3'
657 cls.ipfix_src_port = 4739
658 cls.ipfix_domain_id = 1
660 cls.create_pg_interfaces(range(10))
661 cls.interfaces = list(cls.pg_interfaces[0:4])
663 for i in cls.interfaces:
668 cls.pg0.generate_remote_hosts(3)
669 cls.pg0.configure_ipv4_neighbors()
671 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
672 cls.vapi.ip_table_add_del(10, is_add=1)
673 cls.vapi.ip_table_add_del(20, is_add=1)
675 cls.pg4._local_ip4 = "172.16.255.1"
676 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
677 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
678 cls.pg4.set_table_ip4(10)
679 cls.pg5._local_ip4 = "172.17.255.3"
680 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
681 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
682 cls.pg5.set_table_ip4(10)
683 cls.pg6._local_ip4 = "172.16.255.1"
684 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
685 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
686 cls.pg6.set_table_ip4(20)
687 for i in cls.overlapping_interfaces:
695 cls.pg9.generate_remote_hosts(2)
697 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
698 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
702 cls.pg9.resolve_arp()
703 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
704 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
705 cls.pg9.resolve_arp()
708 super(TestNAT44, cls).tearDownClass()
711 def clear_nat44(self):
713 Clear NAT44 configuration.
715 # I found no elegant way to do this
716 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
717 dst_address_length=32,
718 next_hop_address=self.pg7.remote_ip4n,
719 next_hop_sw_if_index=self.pg7.sw_if_index,
721 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
722 dst_address_length=32,
723 next_hop_address=self.pg8.remote_ip4n,
724 next_hop_sw_if_index=self.pg8.sw_if_index,
727 for intf in [self.pg7, self.pg8]:
728 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
730 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
735 if self.pg7.has_ip4_config:
736 self.pg7.unconfig_ip4()
738 interfaces = self.vapi.nat44_interface_addr_dump()
739 for intf in interfaces:
740 self.vapi.nat44_add_interface_addr(intf.sw_if_index,
741 twice_nat=intf.twice_nat,
744 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
745 domain_id=self.ipfix_domain_id)
746 self.ipfix_src_port = 4739
747 self.ipfix_domain_id = 1
749 interfaces = self.vapi.nat44_interface_dump()
750 for intf in interfaces:
751 if intf.is_inside > 1:
752 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
755 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
759 interfaces = self.vapi.nat44_interface_output_feature_dump()
760 for intf in interfaces:
761 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
765 static_mappings = self.vapi.nat44_static_mapping_dump()
766 for sm in static_mappings:
767 self.vapi.nat44_add_del_static_mapping(
769 sm.external_ip_address,
770 local_port=sm.local_port,
771 external_port=sm.external_port,
772 addr_only=sm.addr_only,
774 protocol=sm.protocol,
775 twice_nat=sm.twice_nat,
778 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
779 for lb_sm in lb_static_mappings:
780 self.vapi.nat44_add_del_lb_static_mapping(
785 twice_nat=lb_sm.twice_nat,
790 identity_mappings = self.vapi.nat44_identity_mapping_dump()
791 for id_m in identity_mappings:
792 self.vapi.nat44_add_del_identity_mapping(
793 addr_only=id_m.addr_only,
796 sw_if_index=id_m.sw_if_index,
798 protocol=id_m.protocol,
801 adresses = self.vapi.nat44_address_dump()
802 for addr in adresses:
803 self.vapi.nat44_add_del_address_range(addr.ip_address,
805 twice_nat=addr.twice_nat,
808 self.vapi.nat_set_reass()
809 self.vapi.nat_set_reass(is_ip6=1)
811 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
812 local_port=0, external_port=0, vrf_id=0,
813 is_add=1, external_sw_if_index=0xFFFFFFFF,
814 proto=0, twice_nat=0):
816 Add/delete NAT44 static mapping
818 :param local_ip: Local IP address
819 :param external_ip: External IP address
820 :param local_port: Local port number (Optional)
821 :param external_port: External port number (Optional)
822 :param vrf_id: VRF ID (Default 0)
823 :param is_add: 1 if add, 0 if delete (Default add)
824 :param external_sw_if_index: External interface instead of IP address
825 :param proto: IP protocol (Mandatory if port specified)
826 :param twice_nat: 1 if translate external host address and port
829 if local_port and external_port:
831 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
832 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
833 self.vapi.nat44_add_del_static_mapping(
836 external_sw_if_index,
845 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
847 Add/delete NAT44 address
849 :param ip: IP address
850 :param is_add: 1 if add, 0 if delete (Default add)
851 :param twice_nat: twice NAT address for extenal hosts
853 nat_addr = socket.inet_pton(socket.AF_INET, ip)
854 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
858 def test_dynamic(self):
859 """ NAT44 dynamic translation test """
861 self.nat44_add_address(self.nat_addr)
862 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
863 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
867 pkts = self.create_stream_in(self.pg0, self.pg1)
868 self.pg0.add_stream(pkts)
869 self.pg_enable_capture(self.pg_interfaces)
871 capture = self.pg1.get_capture(len(pkts))
872 self.verify_capture_out(capture)
875 pkts = self.create_stream_out(self.pg1)
876 self.pg1.add_stream(pkts)
877 self.pg_enable_capture(self.pg_interfaces)
879 capture = self.pg0.get_capture(len(pkts))
880 self.verify_capture_in(capture, self.pg0)
882 def test_dynamic_icmp_errors_in2out_ttl_1(self):
883 """ NAT44 handling of client packets with TTL=1 """
885 self.nat44_add_address(self.nat_addr)
886 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
887 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
890 # Client side - generate traffic
891 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
892 self.pg0.add_stream(pkts)
893 self.pg_enable_capture(self.pg_interfaces)
896 # Client side - verify ICMP type 11 packets
897 capture = self.pg0.get_capture(len(pkts))
898 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
900 def test_dynamic_icmp_errors_out2in_ttl_1(self):
901 """ NAT44 handling of server packets with TTL=1 """
903 self.nat44_add_address(self.nat_addr)
904 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
905 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
908 # Client side - create sessions
909 pkts = self.create_stream_in(self.pg0, self.pg1)
910 self.pg0.add_stream(pkts)
911 self.pg_enable_capture(self.pg_interfaces)
914 # Server side - generate traffic
915 capture = self.pg1.get_capture(len(pkts))
916 self.verify_capture_out(capture)
917 pkts = self.create_stream_out(self.pg1, ttl=1)
918 self.pg1.add_stream(pkts)
919 self.pg_enable_capture(self.pg_interfaces)
922 # Server side - verify ICMP type 11 packets
923 capture = self.pg1.get_capture(len(pkts))
924 self.verify_capture_out_with_icmp_errors(capture,
925 src_ip=self.pg1.local_ip4)
927 def test_dynamic_icmp_errors_in2out_ttl_2(self):
928 """ NAT44 handling of error responses to client packets with TTL=2 """
930 self.nat44_add_address(self.nat_addr)
931 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
932 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
935 # Client side - generate traffic
936 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
937 self.pg0.add_stream(pkts)
938 self.pg_enable_capture(self.pg_interfaces)
941 # Server side - simulate ICMP type 11 response
942 capture = self.pg1.get_capture(len(pkts))
943 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
944 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
945 ICMP(type=11) / packet[IP] for packet in capture]
946 self.pg1.add_stream(pkts)
947 self.pg_enable_capture(self.pg_interfaces)
950 # Client side - verify ICMP type 11 packets
951 capture = self.pg0.get_capture(len(pkts))
952 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
954 def test_dynamic_icmp_errors_out2in_ttl_2(self):
955 """ NAT44 handling of error responses to server packets with TTL=2 """
957 self.nat44_add_address(self.nat_addr)
958 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
959 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
962 # Client side - create sessions
963 pkts = self.create_stream_in(self.pg0, self.pg1)
964 self.pg0.add_stream(pkts)
965 self.pg_enable_capture(self.pg_interfaces)
968 # Server side - generate traffic
969 capture = self.pg1.get_capture(len(pkts))
970 self.verify_capture_out(capture)
971 pkts = self.create_stream_out(self.pg1, ttl=2)
972 self.pg1.add_stream(pkts)
973 self.pg_enable_capture(self.pg_interfaces)
976 # Client side - simulate ICMP type 11 response
977 capture = self.pg0.get_capture(len(pkts))
978 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
979 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
980 ICMP(type=11) / packet[IP] for packet in capture]
981 self.pg0.add_stream(pkts)
982 self.pg_enable_capture(self.pg_interfaces)
985 # Server side - verify ICMP type 11 packets
986 capture = self.pg1.get_capture(len(pkts))
987 self.verify_capture_out_with_icmp_errors(capture)
989 def test_ping_out_interface_from_outside(self):
990 """ Ping NAT44 out interface from outside network """
992 self.nat44_add_address(self.nat_addr)
993 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
994 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
997 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
998 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
999 ICMP(id=self.icmp_id_out, type='echo-request'))
1001 self.pg1.add_stream(pkts)
1002 self.pg_enable_capture(self.pg_interfaces)
1004 capture = self.pg1.get_capture(len(pkts))
1005 self.assertEqual(1, len(capture))
1008 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1009 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1010 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1011 self.assertEqual(packet[ICMP].type, 0) # echo reply
1013 self.logger.error(ppp("Unexpected or invalid packet "
1014 "(outside network):", packet))
1017 def test_ping_internal_host_from_outside(self):
1018 """ Ping internal host from outside network """
1020 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1021 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1022 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1026 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1027 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1028 ICMP(id=self.icmp_id_out, type='echo-request'))
1029 self.pg1.add_stream(pkt)
1030 self.pg_enable_capture(self.pg_interfaces)
1032 capture = self.pg0.get_capture(1)
1033 self.verify_capture_in(capture, self.pg0, packet_num=1)
1034 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1037 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1038 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1039 ICMP(id=self.icmp_id_in, type='echo-reply'))
1040 self.pg0.add_stream(pkt)
1041 self.pg_enable_capture(self.pg_interfaces)
1043 capture = self.pg1.get_capture(1)
1044 self.verify_capture_out(capture, same_port=True, packet_num=1)
1045 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1047 def test_static_in(self):
1048 """ 1:1 NAT initialized from inside network """
1050 nat_ip = "10.0.0.10"
1051 self.tcp_port_out = 6303
1052 self.udp_port_out = 6304
1053 self.icmp_id_out = 6305
1055 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1056 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1057 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1061 pkts = self.create_stream_in(self.pg0, self.pg1)
1062 self.pg0.add_stream(pkts)
1063 self.pg_enable_capture(self.pg_interfaces)
1065 capture = self.pg1.get_capture(len(pkts))
1066 self.verify_capture_out(capture, nat_ip, True)
1069 pkts = self.create_stream_out(self.pg1, nat_ip)
1070 self.pg1.add_stream(pkts)
1071 self.pg_enable_capture(self.pg_interfaces)
1073 capture = self.pg0.get_capture(len(pkts))
1074 self.verify_capture_in(capture, self.pg0)
1076 def test_static_out(self):
1077 """ 1:1 NAT initialized from outside network """
1079 nat_ip = "10.0.0.20"
1080 self.tcp_port_out = 6303
1081 self.udp_port_out = 6304
1082 self.icmp_id_out = 6305
1084 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1085 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1086 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1090 pkts = self.create_stream_out(self.pg1, nat_ip)
1091 self.pg1.add_stream(pkts)
1092 self.pg_enable_capture(self.pg_interfaces)
1094 capture = self.pg0.get_capture(len(pkts))
1095 self.verify_capture_in(capture, self.pg0)
1098 pkts = self.create_stream_in(self.pg0, self.pg1)
1099 self.pg0.add_stream(pkts)
1100 self.pg_enable_capture(self.pg_interfaces)
1102 capture = self.pg1.get_capture(len(pkts))
1103 self.verify_capture_out(capture, nat_ip, True)
1105 def test_static_with_port_in(self):
1106 """ 1:1 NAPT initialized from inside network """
1108 self.tcp_port_out = 3606
1109 self.udp_port_out = 3607
1110 self.icmp_id_out = 3608
1112 self.nat44_add_address(self.nat_addr)
1113 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1114 self.tcp_port_in, self.tcp_port_out,
1115 proto=IP_PROTOS.tcp)
1116 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1117 self.udp_port_in, self.udp_port_out,
1118 proto=IP_PROTOS.udp)
1119 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1120 self.icmp_id_in, self.icmp_id_out,
1121 proto=IP_PROTOS.icmp)
1122 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1123 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1127 pkts = self.create_stream_in(self.pg0, self.pg1)
1128 self.pg0.add_stream(pkts)
1129 self.pg_enable_capture(self.pg_interfaces)
1131 capture = self.pg1.get_capture(len(pkts))
1132 self.verify_capture_out(capture)
1135 pkts = self.create_stream_out(self.pg1)
1136 self.pg1.add_stream(pkts)
1137 self.pg_enable_capture(self.pg_interfaces)
1139 capture = self.pg0.get_capture(len(pkts))
1140 self.verify_capture_in(capture, self.pg0)
1142 def test_static_with_port_out(self):
1143 """ 1:1 NAPT initialized from outside network """
1145 self.tcp_port_out = 30606
1146 self.udp_port_out = 30607
1147 self.icmp_id_out = 30608
1149 self.nat44_add_address(self.nat_addr)
1150 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1151 self.tcp_port_in, self.tcp_port_out,
1152 proto=IP_PROTOS.tcp)
1153 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1154 self.udp_port_in, self.udp_port_out,
1155 proto=IP_PROTOS.udp)
1156 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1157 self.icmp_id_in, self.icmp_id_out,
1158 proto=IP_PROTOS.icmp)
1159 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1160 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1164 pkts = self.create_stream_out(self.pg1)
1165 self.pg1.add_stream(pkts)
1166 self.pg_enable_capture(self.pg_interfaces)
1168 capture = self.pg0.get_capture(len(pkts))
1169 self.verify_capture_in(capture, self.pg0)
1172 pkts = self.create_stream_in(self.pg0, self.pg1)
1173 self.pg0.add_stream(pkts)
1174 self.pg_enable_capture(self.pg_interfaces)
1176 capture = self.pg1.get_capture(len(pkts))
1177 self.verify_capture_out(capture)
1179 def test_static_vrf_aware(self):
1180 """ 1:1 NAT VRF awareness """
1182 nat_ip1 = "10.0.0.30"
1183 nat_ip2 = "10.0.0.40"
1184 self.tcp_port_out = 6303
1185 self.udp_port_out = 6304
1186 self.icmp_id_out = 6305
1188 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1190 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1192 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1194 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1195 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1197 # inside interface VRF match NAT44 static mapping VRF
1198 pkts = self.create_stream_in(self.pg4, self.pg3)
1199 self.pg4.add_stream(pkts)
1200 self.pg_enable_capture(self.pg_interfaces)
1202 capture = self.pg3.get_capture(len(pkts))
1203 self.verify_capture_out(capture, nat_ip1, True)
1205 # inside interface VRF don't match NAT44 static mapping VRF (packets
1207 pkts = self.create_stream_in(self.pg0, self.pg3)
1208 self.pg0.add_stream(pkts)
1209 self.pg_enable_capture(self.pg_interfaces)
1211 self.pg3.assert_nothing_captured()
1213 def test_identity_nat(self):
1214 """ Identity NAT """
1216 self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1217 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1218 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1221 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1222 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1223 TCP(sport=12345, dport=56789))
1224 self.pg1.add_stream(p)
1225 self.pg_enable_capture(self.pg_interfaces)
1227 capture = self.pg0.get_capture(1)
1232 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1233 self.assertEqual(ip.src, self.pg1.remote_ip4)
1234 self.assertEqual(tcp.dport, 56789)
1235 self.assertEqual(tcp.sport, 12345)
1236 self.check_tcp_checksum(p)
1237 self.check_ip_checksum(p)
1239 self.logger.error(ppp("Unexpected or invalid packet:", p))
1242 def test_static_lb(self):
1243 """ NAT44 local service load balancing """
1244 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1247 server1 = self.pg0.remote_hosts[0]
1248 server2 = self.pg0.remote_hosts[1]
1250 locals = [{'addr': server1.ip4n,
1253 {'addr': server2.ip4n,
1257 self.nat44_add_address(self.nat_addr)
1258 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1261 local_num=len(locals),
1263 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1264 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1267 # from client to service
1268 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1269 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1270 TCP(sport=12345, dport=external_port))
1271 self.pg1.add_stream(p)
1272 self.pg_enable_capture(self.pg_interfaces)
1274 capture = self.pg0.get_capture(1)
1280 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1281 if ip.dst == server1.ip4:
1285 self.assertEqual(tcp.dport, local_port)
1286 self.check_tcp_checksum(p)
1287 self.check_ip_checksum(p)
1289 self.logger.error(ppp("Unexpected or invalid packet:", p))
1292 # from service back to client
1293 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1294 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1295 TCP(sport=local_port, dport=12345))
1296 self.pg0.add_stream(p)
1297 self.pg_enable_capture(self.pg_interfaces)
1299 capture = self.pg1.get_capture(1)
1304 self.assertEqual(ip.src, self.nat_addr)
1305 self.assertEqual(tcp.sport, external_port)
1306 self.check_tcp_checksum(p)
1307 self.check_ip_checksum(p)
1309 self.logger.error(ppp("Unexpected or invalid packet:", p))
1315 clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1317 for client in clients:
1318 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1319 IP(src=client, dst=self.nat_addr) /
1320 TCP(sport=12345, dport=external_port))
1322 self.pg1.add_stream(pkts)
1323 self.pg_enable_capture(self.pg_interfaces)
1325 capture = self.pg0.get_capture(len(pkts))
1327 if p[IP].dst == server1.ip4:
1331 self.assertTrue(server1_n > server2_n)
1333 def test_multiple_inside_interfaces(self):
1334 """ NAT44 multiple non-overlapping address space inside interfaces """
1336 self.nat44_add_address(self.nat_addr)
1337 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1338 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1339 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1342 # between two NAT44 inside interfaces (no translation)
1343 pkts = self.create_stream_in(self.pg0, self.pg1)
1344 self.pg0.add_stream(pkts)
1345 self.pg_enable_capture(self.pg_interfaces)
1347 capture = self.pg1.get_capture(len(pkts))
1348 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1350 # from NAT44 inside to interface without NAT44 feature (no translation)
1351 pkts = self.create_stream_in(self.pg0, self.pg2)
1352 self.pg0.add_stream(pkts)
1353 self.pg_enable_capture(self.pg_interfaces)
1355 capture = self.pg2.get_capture(len(pkts))
1356 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1358 # in2out 1st interface
1359 pkts = self.create_stream_in(self.pg0, self.pg3)
1360 self.pg0.add_stream(pkts)
1361 self.pg_enable_capture(self.pg_interfaces)
1363 capture = self.pg3.get_capture(len(pkts))
1364 self.verify_capture_out(capture)
1366 # out2in 1st interface
1367 pkts = self.create_stream_out(self.pg3)
1368 self.pg3.add_stream(pkts)
1369 self.pg_enable_capture(self.pg_interfaces)
1371 capture = self.pg0.get_capture(len(pkts))
1372 self.verify_capture_in(capture, self.pg0)
1374 # in2out 2nd interface
1375 pkts = self.create_stream_in(self.pg1, self.pg3)
1376 self.pg1.add_stream(pkts)
1377 self.pg_enable_capture(self.pg_interfaces)
1379 capture = self.pg3.get_capture(len(pkts))
1380 self.verify_capture_out(capture)
1382 # out2in 2nd interface
1383 pkts = self.create_stream_out(self.pg3)
1384 self.pg3.add_stream(pkts)
1385 self.pg_enable_capture(self.pg_interfaces)
1387 capture = self.pg1.get_capture(len(pkts))
1388 self.verify_capture_in(capture, self.pg1)
1390 def test_inside_overlapping_interfaces(self):
1391 """ NAT44 multiple inside interfaces with overlapping address space """
1393 static_nat_ip = "10.0.0.10"
1394 self.nat44_add_address(self.nat_addr)
1395 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1397 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1398 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1399 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1400 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1403 # between NAT44 inside interfaces with same VRF (no translation)
1404 pkts = self.create_stream_in(self.pg4, self.pg5)
1405 self.pg4.add_stream(pkts)
1406 self.pg_enable_capture(self.pg_interfaces)
1408 capture = self.pg5.get_capture(len(pkts))
1409 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1411 # between NAT44 inside interfaces with different VRF (hairpinning)
1412 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1413 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1414 TCP(sport=1234, dport=5678))
1415 self.pg4.add_stream(p)
1416 self.pg_enable_capture(self.pg_interfaces)
1418 capture = self.pg6.get_capture(1)
1423 self.assertEqual(ip.src, self.nat_addr)
1424 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1425 self.assertNotEqual(tcp.sport, 1234)
1426 self.assertEqual(tcp.dport, 5678)
1428 self.logger.error(ppp("Unexpected or invalid packet:", p))
1431 # in2out 1st interface
1432 pkts = self.create_stream_in(self.pg4, self.pg3)
1433 self.pg4.add_stream(pkts)
1434 self.pg_enable_capture(self.pg_interfaces)
1436 capture = self.pg3.get_capture(len(pkts))
1437 self.verify_capture_out(capture)
1439 # out2in 1st interface
1440 pkts = self.create_stream_out(self.pg3)
1441 self.pg3.add_stream(pkts)
1442 self.pg_enable_capture(self.pg_interfaces)
1444 capture = self.pg4.get_capture(len(pkts))
1445 self.verify_capture_in(capture, self.pg4)
1447 # in2out 2nd interface
1448 pkts = self.create_stream_in(self.pg5, self.pg3)
1449 self.pg5.add_stream(pkts)
1450 self.pg_enable_capture(self.pg_interfaces)
1452 capture = self.pg3.get_capture(len(pkts))
1453 self.verify_capture_out(capture)
1455 # out2in 2nd interface
1456 pkts = self.create_stream_out(self.pg3)
1457 self.pg3.add_stream(pkts)
1458 self.pg_enable_capture(self.pg_interfaces)
1460 capture = self.pg5.get_capture(len(pkts))
1461 self.verify_capture_in(capture, self.pg5)
1464 addresses = self.vapi.nat44_address_dump()
1465 self.assertEqual(len(addresses), 1)
1466 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1467 self.assertEqual(len(sessions), 3)
1468 for session in sessions:
1469 self.assertFalse(session.is_static)
1470 self.assertEqual(session.inside_ip_address[0:4],
1471 self.pg5.remote_ip4n)
1472 self.assertEqual(session.outside_ip_address,
1473 addresses[0].ip_address)
1474 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1475 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1476 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1477 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1478 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1479 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1480 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1481 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1482 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1484 # in2out 3rd interface
1485 pkts = self.create_stream_in(self.pg6, self.pg3)
1486 self.pg6.add_stream(pkts)
1487 self.pg_enable_capture(self.pg_interfaces)
1489 capture = self.pg3.get_capture(len(pkts))
1490 self.verify_capture_out(capture, static_nat_ip, True)
1492 # out2in 3rd interface
1493 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1494 self.pg3.add_stream(pkts)
1495 self.pg_enable_capture(self.pg_interfaces)
1497 capture = self.pg6.get_capture(len(pkts))
1498 self.verify_capture_in(capture, self.pg6)
1500 # general user and session dump verifications
1501 users = self.vapi.nat44_user_dump()
1502 self.assertTrue(len(users) >= 3)
1503 addresses = self.vapi.nat44_address_dump()
1504 self.assertEqual(len(addresses), 1)
1506 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1508 for session in sessions:
1509 self.assertEqual(user.ip_address, session.inside_ip_address)
1510 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1511 self.assertTrue(session.protocol in
1512 [IP_PROTOS.tcp, IP_PROTOS.udp,
1516 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1517 self.assertTrue(len(sessions) >= 4)
1518 for session in sessions:
1519 self.assertFalse(session.is_static)
1520 self.assertEqual(session.inside_ip_address[0:4],
1521 self.pg4.remote_ip4n)
1522 self.assertEqual(session.outside_ip_address,
1523 addresses[0].ip_address)
1526 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1527 self.assertTrue(len(sessions) >= 3)
1528 for session in sessions:
1529 self.assertTrue(session.is_static)
1530 self.assertEqual(session.inside_ip_address[0:4],
1531 self.pg6.remote_ip4n)
1532 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1533 map(int, static_nat_ip.split('.')))
1534 self.assertTrue(session.inside_port in
1535 [self.tcp_port_in, self.udp_port_in,
1538 def test_hairpinning(self):
1539 """ NAT44 hairpinning - 1:1 NAPT """
1541 host = self.pg0.remote_hosts[0]
1542 server = self.pg0.remote_hosts[1]
1545 server_in_port = 5678
1546 server_out_port = 8765
1548 self.nat44_add_address(self.nat_addr)
1549 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1550 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1552 # add static mapping for server
1553 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1554 server_in_port, server_out_port,
1555 proto=IP_PROTOS.tcp)
1557 # send packet from host to server
1558 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1559 IP(src=host.ip4, dst=self.nat_addr) /
1560 TCP(sport=host_in_port, dport=server_out_port))
1561 self.pg0.add_stream(p)
1562 self.pg_enable_capture(self.pg_interfaces)
1564 capture = self.pg0.get_capture(1)
1569 self.assertEqual(ip.src, self.nat_addr)
1570 self.assertEqual(ip.dst, server.ip4)
1571 self.assertNotEqual(tcp.sport, host_in_port)
1572 self.assertEqual(tcp.dport, server_in_port)
1573 self.check_tcp_checksum(p)
1574 host_out_port = tcp.sport
1576 self.logger.error(ppp("Unexpected or invalid packet:", p))
1579 # send reply from server to host
1580 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1581 IP(src=server.ip4, dst=self.nat_addr) /
1582 TCP(sport=server_in_port, dport=host_out_port))
1583 self.pg0.add_stream(p)
1584 self.pg_enable_capture(self.pg_interfaces)
1586 capture = self.pg0.get_capture(1)
1591 self.assertEqual(ip.src, self.nat_addr)
1592 self.assertEqual(ip.dst, host.ip4)
1593 self.assertEqual(tcp.sport, server_out_port)
1594 self.assertEqual(tcp.dport, host_in_port)
1595 self.check_tcp_checksum(p)
1597 self.logger.error(ppp("Unexpected or invalid packet:"), p)
1600 def test_hairpinning2(self):
1601 """ NAT44 hairpinning - 1:1 NAT"""
1603 server1_nat_ip = "10.0.0.10"
1604 server2_nat_ip = "10.0.0.11"
1605 host = self.pg0.remote_hosts[0]
1606 server1 = self.pg0.remote_hosts[1]
1607 server2 = self.pg0.remote_hosts[2]
1608 server_tcp_port = 22
1609 server_udp_port = 20
1611 self.nat44_add_address(self.nat_addr)
1612 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1613 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1616 # add static mapping for servers
1617 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1618 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1622 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1623 IP(src=host.ip4, dst=server1_nat_ip) /
1624 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1626 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1627 IP(src=host.ip4, dst=server1_nat_ip) /
1628 UDP(sport=self.udp_port_in, dport=server_udp_port))
1630 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1631 IP(src=host.ip4, dst=server1_nat_ip) /
1632 ICMP(id=self.icmp_id_in, type='echo-request'))
1634 self.pg0.add_stream(pkts)
1635 self.pg_enable_capture(self.pg_interfaces)
1637 capture = self.pg0.get_capture(len(pkts))
1638 for packet in capture:
1640 self.assertEqual(packet[IP].src, self.nat_addr)
1641 self.assertEqual(packet[IP].dst, server1.ip4)
1642 if packet.haslayer(TCP):
1643 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1644 self.assertEqual(packet[TCP].dport, server_tcp_port)
1645 self.tcp_port_out = packet[TCP].sport
1646 self.check_tcp_checksum(packet)
1647 elif packet.haslayer(UDP):
1648 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1649 self.assertEqual(packet[UDP].dport, server_udp_port)
1650 self.udp_port_out = packet[UDP].sport
1652 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1653 self.icmp_id_out = packet[ICMP].id
1655 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1660 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1661 IP(src=server1.ip4, dst=self.nat_addr) /
1662 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1664 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1665 IP(src=server1.ip4, dst=self.nat_addr) /
1666 UDP(sport=server_udp_port, dport=self.udp_port_out))
1668 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1669 IP(src=server1.ip4, dst=self.nat_addr) /
1670 ICMP(id=self.icmp_id_out, type='echo-reply'))
1672 self.pg0.add_stream(pkts)
1673 self.pg_enable_capture(self.pg_interfaces)
1675 capture = self.pg0.get_capture(len(pkts))
1676 for packet in capture:
1678 self.assertEqual(packet[IP].src, server1_nat_ip)
1679 self.assertEqual(packet[IP].dst, host.ip4)
1680 if packet.haslayer(TCP):
1681 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1682 self.assertEqual(packet[TCP].sport, server_tcp_port)
1683 self.check_tcp_checksum(packet)
1684 elif packet.haslayer(UDP):
1685 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1686 self.assertEqual(packet[UDP].sport, server_udp_port)
1688 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1690 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1693 # server2 to server1
1695 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1696 IP(src=server2.ip4, dst=server1_nat_ip) /
1697 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1699 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1700 IP(src=server2.ip4, dst=server1_nat_ip) /
1701 UDP(sport=self.udp_port_in, dport=server_udp_port))
1703 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1704 IP(src=server2.ip4, dst=server1_nat_ip) /
1705 ICMP(id=self.icmp_id_in, type='echo-request'))
1707 self.pg0.add_stream(pkts)
1708 self.pg_enable_capture(self.pg_interfaces)
1710 capture = self.pg0.get_capture(len(pkts))
1711 for packet in capture:
1713 self.assertEqual(packet[IP].src, server2_nat_ip)
1714 self.assertEqual(packet[IP].dst, server1.ip4)
1715 if packet.haslayer(TCP):
1716 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1717 self.assertEqual(packet[TCP].dport, server_tcp_port)
1718 self.tcp_port_out = packet[TCP].sport
1719 self.check_tcp_checksum(packet)
1720 elif packet.haslayer(UDP):
1721 self.assertEqual(packet[UDP].sport, self.udp_port_in)
1722 self.assertEqual(packet[UDP].dport, server_udp_port)
1723 self.udp_port_out = packet[UDP].sport
1725 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1726 self.icmp_id_out = packet[ICMP].id
1728 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1731 # server1 to server2
1733 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1734 IP(src=server1.ip4, dst=server2_nat_ip) /
1735 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1737 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1738 IP(src=server1.ip4, dst=server2_nat_ip) /
1739 UDP(sport=server_udp_port, dport=self.udp_port_out))
1741 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1742 IP(src=server1.ip4, dst=server2_nat_ip) /
1743 ICMP(id=self.icmp_id_out, type='echo-reply'))
1745 self.pg0.add_stream(pkts)
1746 self.pg_enable_capture(self.pg_interfaces)
1748 capture = self.pg0.get_capture(len(pkts))
1749 for packet in capture:
1751 self.assertEqual(packet[IP].src, server1_nat_ip)
1752 self.assertEqual(packet[IP].dst, server2.ip4)
1753 if packet.haslayer(TCP):
1754 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1755 self.assertEqual(packet[TCP].sport, server_tcp_port)
1756 self.check_tcp_checksum(packet)
1757 elif packet.haslayer(UDP):
1758 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1759 self.assertEqual(packet[UDP].sport, server_udp_port)
1761 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1763 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1766 def test_max_translations_per_user(self):
1767 """ MAX translations per user - recycle the least recently used """
1769 self.nat44_add_address(self.nat_addr)
1770 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1771 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1774 # get maximum number of translations per user
1775 nat44_config = self.vapi.nat_show_config()
1777 # send more than maximum number of translations per user packets
1778 pkts_num = nat44_config.max_translations_per_user + 5
1780 for port in range(0, pkts_num):
1781 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1782 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1783 TCP(sport=1025 + port))
1785 self.pg0.add_stream(pkts)
1786 self.pg_enable_capture(self.pg_interfaces)
1789 # verify number of translated packet
1790 self.pg1.get_capture(pkts_num)
1792 def test_interface_addr(self):
1793 """ Acquire NAT44 addresses from interface """
1794 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1796 # no address in NAT pool
1797 adresses = self.vapi.nat44_address_dump()
1798 self.assertEqual(0, len(adresses))
1800 # configure interface address and check NAT address pool
1801 self.pg7.config_ip4()
1802 adresses = self.vapi.nat44_address_dump()
1803 self.assertEqual(1, len(adresses))
1804 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1806 # remove interface address and check NAT address pool
1807 self.pg7.unconfig_ip4()
1808 adresses = self.vapi.nat44_address_dump()
1809 self.assertEqual(0, len(adresses))
1811 def test_interface_addr_static_mapping(self):
1812 """ Static mapping with addresses from interface """
1813 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1814 self.nat44_add_static_mapping(
1816 external_sw_if_index=self.pg7.sw_if_index)
1818 # static mappings with external interface
1819 static_mappings = self.vapi.nat44_static_mapping_dump()
1820 self.assertEqual(1, len(static_mappings))
1821 self.assertEqual(self.pg7.sw_if_index,
1822 static_mappings[0].external_sw_if_index)
1824 # configure interface address and check static mappings
1825 self.pg7.config_ip4()
1826 static_mappings = self.vapi.nat44_static_mapping_dump()
1827 self.assertEqual(1, len(static_mappings))
1828 self.assertEqual(static_mappings[0].external_ip_address[0:4],
1829 self.pg7.local_ip4n)
1830 self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1832 # remove interface address and check static mappings
1833 self.pg7.unconfig_ip4()
1834 static_mappings = self.vapi.nat44_static_mapping_dump()
1835 self.assertEqual(0, len(static_mappings))
1837 def test_interface_addr_identity_nat(self):
1838 """ Identity NAT with addresses from interface """
1841 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1842 self.vapi.nat44_add_del_identity_mapping(
1843 sw_if_index=self.pg7.sw_if_index,
1845 protocol=IP_PROTOS.tcp,
1848 # identity mappings with external interface
1849 identity_mappings = self.vapi.nat44_identity_mapping_dump()
1850 self.assertEqual(1, len(identity_mappings))
1851 self.assertEqual(self.pg7.sw_if_index,
1852 identity_mappings[0].sw_if_index)
1854 # configure interface address and check identity mappings
1855 self.pg7.config_ip4()
1856 identity_mappings = self.vapi.nat44_identity_mapping_dump()
1857 self.assertEqual(1, len(identity_mappings))
1858 self.assertEqual(identity_mappings[0].ip_address,
1859 self.pg7.local_ip4n)
1860 self.assertEqual(0xFFFFFFFF, identity_mappings[0].sw_if_index)
1861 self.assertEqual(port, identity_mappings[0].port)
1862 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
1864 # remove interface address and check identity mappings
1865 self.pg7.unconfig_ip4()
1866 identity_mappings = self.vapi.nat44_identity_mapping_dump()
1867 self.assertEqual(0, len(identity_mappings))
1869 def test_ipfix_nat44_sess(self):
1870 """ IPFIX logging NAT44 session created/delted """
1871 self.ipfix_domain_id = 10
1872 self.ipfix_src_port = 20202
1873 colector_port = 30303
1874 bind_layers(UDP, IPFIX, dport=30303)
1875 self.nat44_add_address(self.nat_addr)
1876 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1877 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1879 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1880 src_address=self.pg3.local_ip4n,
1882 template_interval=10,
1883 collector_port=colector_port)
1884 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1885 src_port=self.ipfix_src_port)
1887 pkts = self.create_stream_in(self.pg0, self.pg1)
1888 self.pg0.add_stream(pkts)
1889 self.pg_enable_capture(self.pg_interfaces)
1891 capture = self.pg1.get_capture(len(pkts))
1892 self.verify_capture_out(capture)
1893 self.nat44_add_address(self.nat_addr, is_add=0)
1894 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1895 capture = self.pg3.get_capture(3)
1896 ipfix = IPFIXDecoder()
1897 # first load template
1899 self.assertTrue(p.haslayer(IPFIX))
1900 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1901 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1902 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1903 self.assertEqual(p[UDP].dport, colector_port)
1904 self.assertEqual(p[IPFIX].observationDomainID,
1905 self.ipfix_domain_id)
1906 if p.haslayer(Template):
1907 ipfix.add_template(p.getlayer(Template))
1908 # verify events in data set
1910 if p.haslayer(Data):
1911 data = ipfix.decode_data_set(p.getlayer(Set))
1912 self.verify_ipfix_nat44_ses(data)
1914 def test_ipfix_addr_exhausted(self):
1915 """ IPFIX logging NAT addresses exhausted """
1916 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1917 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1919 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1920 src_address=self.pg3.local_ip4n,
1922 template_interval=10)
1923 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1924 src_port=self.ipfix_src_port)
1926 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1927 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1929 self.pg0.add_stream(p)
1930 self.pg_enable_capture(self.pg_interfaces)
1932 capture = self.pg1.get_capture(0)
1933 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1934 capture = self.pg3.get_capture(3)
1935 ipfix = IPFIXDecoder()
1936 # first load template
1938 self.assertTrue(p.haslayer(IPFIX))
1939 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1940 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1941 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1942 self.assertEqual(p[UDP].dport, 4739)
1943 self.assertEqual(p[IPFIX].observationDomainID,
1944 self.ipfix_domain_id)
1945 if p.haslayer(Template):
1946 ipfix.add_template(p.getlayer(Template))
1947 # verify events in data set
1949 if p.haslayer(Data):
1950 data = ipfix.decode_data_set(p.getlayer(Set))
1951 self.verify_ipfix_addr_exhausted(data)
1953 def test_pool_addr_fib(self):
1954 """ NAT44 add pool addresses to FIB """
1955 static_addr = '10.0.0.10'
1956 self.nat44_add_address(self.nat_addr)
1957 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1958 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1960 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
1963 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1964 ARP(op=ARP.who_has, pdst=self.nat_addr,
1965 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1966 self.pg1.add_stream(p)
1967 self.pg_enable_capture(self.pg_interfaces)
1969 capture = self.pg1.get_capture(1)
1970 self.assertTrue(capture[0].haslayer(ARP))
1971 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1974 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1975 ARP(op=ARP.who_has, pdst=static_addr,
1976 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1977 self.pg1.add_stream(p)
1978 self.pg_enable_capture(self.pg_interfaces)
1980 capture = self.pg1.get_capture(1)
1981 self.assertTrue(capture[0].haslayer(ARP))
1982 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1984 # send ARP to non-NAT44 interface
1985 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1986 ARP(op=ARP.who_has, pdst=self.nat_addr,
1987 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1988 self.pg2.add_stream(p)
1989 self.pg_enable_capture(self.pg_interfaces)
1991 capture = self.pg1.get_capture(0)
1993 # remove addresses and verify
1994 self.nat44_add_address(self.nat_addr, is_add=0)
1995 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
1998 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1999 ARP(op=ARP.who_has, pdst=self.nat_addr,
2000 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2001 self.pg1.add_stream(p)
2002 self.pg_enable_capture(self.pg_interfaces)
2004 capture = self.pg1.get_capture(0)
2006 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2007 ARP(op=ARP.who_has, pdst=static_addr,
2008 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2009 self.pg1.add_stream(p)
2010 self.pg_enable_capture(self.pg_interfaces)
2012 capture = self.pg1.get_capture(0)
2014 def test_vrf_mode(self):
2015 """ NAT44 tenant VRF aware address pool mode """
2019 nat_ip1 = "10.0.0.10"
2020 nat_ip2 = "10.0.0.11"
2022 self.pg0.unconfig_ip4()
2023 self.pg1.unconfig_ip4()
2024 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2025 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2026 self.pg0.set_table_ip4(vrf_id1)
2027 self.pg1.set_table_ip4(vrf_id2)
2028 self.pg0.config_ip4()
2029 self.pg1.config_ip4()
2031 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2032 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2033 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2034 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2035 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2039 pkts = self.create_stream_in(self.pg0, self.pg2)
2040 self.pg0.add_stream(pkts)
2041 self.pg_enable_capture(self.pg_interfaces)
2043 capture = self.pg2.get_capture(len(pkts))
2044 self.verify_capture_out(capture, nat_ip1)
2047 pkts = self.create_stream_in(self.pg1, self.pg2)
2048 self.pg1.add_stream(pkts)
2049 self.pg_enable_capture(self.pg_interfaces)
2051 capture = self.pg2.get_capture(len(pkts))
2052 self.verify_capture_out(capture, nat_ip2)
2054 self.pg0.unconfig_ip4()
2055 self.pg1.unconfig_ip4()
2056 self.pg0.set_table_ip4(0)
2057 self.pg1.set_table_ip4(0)
2058 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2059 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2061 def test_vrf_feature_independent(self):
2062 """ NAT44 tenant VRF independent address pool mode """
2064 nat_ip1 = "10.0.0.10"
2065 nat_ip2 = "10.0.0.11"
2067 self.nat44_add_address(nat_ip1)
2068 self.nat44_add_address(nat_ip2, vrf_id=99)
2069 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2070 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2071 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2075 pkts = self.create_stream_in(self.pg0, self.pg2)
2076 self.pg0.add_stream(pkts)
2077 self.pg_enable_capture(self.pg_interfaces)
2079 capture = self.pg2.get_capture(len(pkts))
2080 self.verify_capture_out(capture, nat_ip1)
2083 pkts = self.create_stream_in(self.pg1, self.pg2)
2084 self.pg1.add_stream(pkts)
2085 self.pg_enable_capture(self.pg_interfaces)
2087 capture = self.pg2.get_capture(len(pkts))
2088 self.verify_capture_out(capture, nat_ip1)
2090 def test_dynamic_ipless_interfaces(self):
2091 """ NAT44 interfaces without configured IP address """
2093 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2094 mactobinary(self.pg7.remote_mac),
2095 self.pg7.remote_ip4n,
2097 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2098 mactobinary(self.pg8.remote_mac),
2099 self.pg8.remote_ip4n,
2102 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2103 dst_address_length=32,
2104 next_hop_address=self.pg7.remote_ip4n,
2105 next_hop_sw_if_index=self.pg7.sw_if_index)
2106 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2107 dst_address_length=32,
2108 next_hop_address=self.pg8.remote_ip4n,
2109 next_hop_sw_if_index=self.pg8.sw_if_index)
2111 self.nat44_add_address(self.nat_addr)
2112 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2113 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2117 pkts = self.create_stream_in(self.pg7, self.pg8)
2118 self.pg7.add_stream(pkts)
2119 self.pg_enable_capture(self.pg_interfaces)
2121 capture = self.pg8.get_capture(len(pkts))
2122 self.verify_capture_out(capture)
2125 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2126 self.pg8.add_stream(pkts)
2127 self.pg_enable_capture(self.pg_interfaces)
2129 capture = self.pg7.get_capture(len(pkts))
2130 self.verify_capture_in(capture, self.pg7)
2132 def test_static_ipless_interfaces(self):
2133 """ NAT44 interfaces without configured IP address - 1:1 NAT """
2135 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2136 mactobinary(self.pg7.remote_mac),
2137 self.pg7.remote_ip4n,
2139 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2140 mactobinary(self.pg8.remote_mac),
2141 self.pg8.remote_ip4n,
2144 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2145 dst_address_length=32,
2146 next_hop_address=self.pg7.remote_ip4n,
2147 next_hop_sw_if_index=self.pg7.sw_if_index)
2148 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2149 dst_address_length=32,
2150 next_hop_address=self.pg8.remote_ip4n,
2151 next_hop_sw_if_index=self.pg8.sw_if_index)
2153 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2154 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2155 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2159 pkts = self.create_stream_out(self.pg8)
2160 self.pg8.add_stream(pkts)
2161 self.pg_enable_capture(self.pg_interfaces)
2163 capture = self.pg7.get_capture(len(pkts))
2164 self.verify_capture_in(capture, self.pg7)
2167 pkts = self.create_stream_in(self.pg7, self.pg8)
2168 self.pg7.add_stream(pkts)
2169 self.pg_enable_capture(self.pg_interfaces)
2171 capture = self.pg8.get_capture(len(pkts))
2172 self.verify_capture_out(capture, self.nat_addr, True)
2174 def test_static_with_port_ipless_interfaces(self):
2175 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2177 self.tcp_port_out = 30606
2178 self.udp_port_out = 30607
2179 self.icmp_id_out = 30608
2181 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2182 mactobinary(self.pg7.remote_mac),
2183 self.pg7.remote_ip4n,
2185 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2186 mactobinary(self.pg8.remote_mac),
2187 self.pg8.remote_ip4n,
2190 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2191 dst_address_length=32,
2192 next_hop_address=self.pg7.remote_ip4n,
2193 next_hop_sw_if_index=self.pg7.sw_if_index)
2194 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2195 dst_address_length=32,
2196 next_hop_address=self.pg8.remote_ip4n,
2197 next_hop_sw_if_index=self.pg8.sw_if_index)
2199 self.nat44_add_address(self.nat_addr)
2200 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2201 self.tcp_port_in, self.tcp_port_out,
2202 proto=IP_PROTOS.tcp)
2203 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2204 self.udp_port_in, self.udp_port_out,
2205 proto=IP_PROTOS.udp)
2206 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2207 self.icmp_id_in, self.icmp_id_out,
2208 proto=IP_PROTOS.icmp)
2209 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2210 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2214 pkts = self.create_stream_out(self.pg8)
2215 self.pg8.add_stream(pkts)
2216 self.pg_enable_capture(self.pg_interfaces)
2218 capture = self.pg7.get_capture(len(pkts))
2219 self.verify_capture_in(capture, self.pg7)
2222 pkts = self.create_stream_in(self.pg7, self.pg8)
2223 self.pg7.add_stream(pkts)
2224 self.pg_enable_capture(self.pg_interfaces)
2226 capture = self.pg8.get_capture(len(pkts))
2227 self.verify_capture_out(capture)
2229 def test_static_unknown_proto(self):
2230 """ 1:1 NAT translate packet with unknown protocol """
2231 nat_ip = "10.0.0.10"
2232 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2233 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2234 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2238 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2239 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2241 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2242 TCP(sport=1234, dport=1234))
2243 self.pg0.add_stream(p)
2244 self.pg_enable_capture(self.pg_interfaces)
2246 p = self.pg1.get_capture(1)
2249 self.assertEqual(packet[IP].src, nat_ip)
2250 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2251 self.assertTrue(packet.haslayer(GRE))
2252 self.check_ip_checksum(packet)
2254 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2258 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2259 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2261 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2262 TCP(sport=1234, dport=1234))
2263 self.pg1.add_stream(p)
2264 self.pg_enable_capture(self.pg_interfaces)
2266 p = self.pg0.get_capture(1)
2269 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2270 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2271 self.assertTrue(packet.haslayer(GRE))
2272 self.check_ip_checksum(packet)
2274 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2277 def test_hairpinning_static_unknown_proto(self):
2278 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2280 host = self.pg0.remote_hosts[0]
2281 server = self.pg0.remote_hosts[1]
2283 host_nat_ip = "10.0.0.10"
2284 server_nat_ip = "10.0.0.11"
2286 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2287 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2288 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2289 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2293 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2294 IP(src=host.ip4, dst=server_nat_ip) /
2296 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2297 TCP(sport=1234, dport=1234))
2298 self.pg0.add_stream(p)
2299 self.pg_enable_capture(self.pg_interfaces)
2301 p = self.pg0.get_capture(1)
2304 self.assertEqual(packet[IP].src, host_nat_ip)
2305 self.assertEqual(packet[IP].dst, server.ip4)
2306 self.assertTrue(packet.haslayer(GRE))
2307 self.check_ip_checksum(packet)
2309 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2313 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2314 IP(src=server.ip4, dst=host_nat_ip) /
2316 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2317 TCP(sport=1234, dport=1234))
2318 self.pg0.add_stream(p)
2319 self.pg_enable_capture(self.pg_interfaces)
2321 p = self.pg0.get_capture(1)
2324 self.assertEqual(packet[IP].src, server_nat_ip)
2325 self.assertEqual(packet[IP].dst, host.ip4)
2326 self.assertTrue(packet.haslayer(GRE))
2327 self.check_ip_checksum(packet)
2329 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2332 def test_unknown_proto(self):
2333 """ NAT44 translate packet with unknown protocol """
2334 self.nat44_add_address(self.nat_addr)
2335 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2336 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2340 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2341 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2342 TCP(sport=self.tcp_port_in, dport=20))
2343 self.pg0.add_stream(p)
2344 self.pg_enable_capture(self.pg_interfaces)
2346 p = self.pg1.get_capture(1)
2348 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2349 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2351 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2352 TCP(sport=1234, dport=1234))
2353 self.pg0.add_stream(p)
2354 self.pg_enable_capture(self.pg_interfaces)
2356 p = self.pg1.get_capture(1)
2359 self.assertEqual(packet[IP].src, self.nat_addr)
2360 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2361 self.assertTrue(packet.haslayer(GRE))
2362 self.check_ip_checksum(packet)
2364 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2368 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2369 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2371 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2372 TCP(sport=1234, dport=1234))
2373 self.pg1.add_stream(p)
2374 self.pg_enable_capture(self.pg_interfaces)
2376 p = self.pg0.get_capture(1)
2379 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2380 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2381 self.assertTrue(packet.haslayer(GRE))
2382 self.check_ip_checksum(packet)
2384 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2387 def test_hairpinning_unknown_proto(self):
2388 """ NAT44 translate packet with unknown protocol - hairpinning """
2389 host = self.pg0.remote_hosts[0]
2390 server = self.pg0.remote_hosts[1]
2393 server_in_port = 5678
2394 server_out_port = 8765
2395 server_nat_ip = "10.0.0.11"
2397 self.nat44_add_address(self.nat_addr)
2398 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2399 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2402 # add static mapping for server
2403 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2406 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2407 IP(src=host.ip4, dst=server_nat_ip) /
2408 TCP(sport=host_in_port, dport=server_out_port))
2409 self.pg0.add_stream(p)
2410 self.pg_enable_capture(self.pg_interfaces)
2412 capture = self.pg0.get_capture(1)
2414 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2415 IP(src=host.ip4, dst=server_nat_ip) /
2417 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2418 TCP(sport=1234, dport=1234))
2419 self.pg0.add_stream(p)
2420 self.pg_enable_capture(self.pg_interfaces)
2422 p = self.pg0.get_capture(1)
2425 self.assertEqual(packet[IP].src, self.nat_addr)
2426 self.assertEqual(packet[IP].dst, server.ip4)
2427 self.assertTrue(packet.haslayer(GRE))
2428 self.check_ip_checksum(packet)
2430 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2434 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2435 IP(src=server.ip4, dst=self.nat_addr) /
2437 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2438 TCP(sport=1234, dport=1234))
2439 self.pg0.add_stream(p)
2440 self.pg_enable_capture(self.pg_interfaces)
2442 p = self.pg0.get_capture(1)
2445 self.assertEqual(packet[IP].src, server_nat_ip)
2446 self.assertEqual(packet[IP].dst, host.ip4)
2447 self.assertTrue(packet.haslayer(GRE))
2448 self.check_ip_checksum(packet)
2450 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2453 def test_output_feature(self):
2454 """ NAT44 interface output feature (in2out postrouting) """
2455 self.nat44_add_address(self.nat_addr)
2456 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2457 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
2458 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2462 pkts = self.create_stream_in(self.pg0, self.pg3)
2463 self.pg0.add_stream(pkts)
2464 self.pg_enable_capture(self.pg_interfaces)
2466 capture = self.pg3.get_capture(len(pkts))
2467 self.verify_capture_out(capture)
2470 pkts = self.create_stream_out(self.pg3)
2471 self.pg3.add_stream(pkts)
2472 self.pg_enable_capture(self.pg_interfaces)
2474 capture = self.pg0.get_capture(len(pkts))
2475 self.verify_capture_in(capture, self.pg0)
2477 # from non-NAT interface to NAT inside interface
2478 pkts = self.create_stream_in(self.pg2, self.pg0)
2479 self.pg2.add_stream(pkts)
2480 self.pg_enable_capture(self.pg_interfaces)
2482 capture = self.pg0.get_capture(len(pkts))
2483 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2485 def test_output_feature_vrf_aware(self):
2486 """ NAT44 interface output feature VRF aware (in2out postrouting) """
2487 nat_ip_vrf10 = "10.0.0.10"
2488 nat_ip_vrf20 = "10.0.0.20"
2490 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2491 dst_address_length=32,
2492 next_hop_address=self.pg3.remote_ip4n,
2493 next_hop_sw_if_index=self.pg3.sw_if_index,
2495 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2496 dst_address_length=32,
2497 next_hop_address=self.pg3.remote_ip4n,
2498 next_hop_sw_if_index=self.pg3.sw_if_index,
2501 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2502 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2503 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2504 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2505 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2509 pkts = self.create_stream_in(self.pg4, self.pg3)
2510 self.pg4.add_stream(pkts)
2511 self.pg_enable_capture(self.pg_interfaces)
2513 capture = self.pg3.get_capture(len(pkts))
2514 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2517 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2518 self.pg3.add_stream(pkts)
2519 self.pg_enable_capture(self.pg_interfaces)
2521 capture = self.pg4.get_capture(len(pkts))
2522 self.verify_capture_in(capture, self.pg4)
2525 pkts = self.create_stream_in(self.pg6, self.pg3)
2526 self.pg6.add_stream(pkts)
2527 self.pg_enable_capture(self.pg_interfaces)
2529 capture = self.pg3.get_capture(len(pkts))
2530 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2533 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2534 self.pg3.add_stream(pkts)
2535 self.pg_enable_capture(self.pg_interfaces)
2537 capture = self.pg6.get_capture(len(pkts))
2538 self.verify_capture_in(capture, self.pg6)
2540 def test_output_feature_hairpinning(self):
2541 """ NAT44 interface output feature hairpinning (in2out postrouting) """
2542 host = self.pg0.remote_hosts[0]
2543 server = self.pg0.remote_hosts[1]
2546 server_in_port = 5678
2547 server_out_port = 8765
2549 self.nat44_add_address(self.nat_addr)
2550 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2551 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2554 # add static mapping for server
2555 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2556 server_in_port, server_out_port,
2557 proto=IP_PROTOS.tcp)
2559 # send packet from host to server
2560 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2561 IP(src=host.ip4, dst=self.nat_addr) /
2562 TCP(sport=host_in_port, dport=server_out_port))
2563 self.pg0.add_stream(p)
2564 self.pg_enable_capture(self.pg_interfaces)
2566 capture = self.pg0.get_capture(1)
2571 self.assertEqual(ip.src, self.nat_addr)
2572 self.assertEqual(ip.dst, server.ip4)
2573 self.assertNotEqual(tcp.sport, host_in_port)
2574 self.assertEqual(tcp.dport, server_in_port)
2575 self.check_tcp_checksum(p)
2576 host_out_port = tcp.sport
2578 self.logger.error(ppp("Unexpected or invalid packet:", p))
2581 # send reply from server to host
2582 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2583 IP(src=server.ip4, dst=self.nat_addr) /
2584 TCP(sport=server_in_port, dport=host_out_port))
2585 self.pg0.add_stream(p)
2586 self.pg_enable_capture(self.pg_interfaces)
2588 capture = self.pg0.get_capture(1)
2593 self.assertEqual(ip.src, self.nat_addr)
2594 self.assertEqual(ip.dst, host.ip4)
2595 self.assertEqual(tcp.sport, server_out_port)
2596 self.assertEqual(tcp.dport, host_in_port)
2597 self.check_tcp_checksum(p)
2599 self.logger.error(ppp("Unexpected or invalid packet:"), p)
2602 def test_one_armed_nat44(self):
2603 """ One armed NAT44 """
2604 remote_host = self.pg9.remote_hosts[0]
2605 local_host = self.pg9.remote_hosts[1]
2608 self.nat44_add_address(self.nat_addr)
2609 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
2610 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
2614 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2615 IP(src=local_host.ip4, dst=remote_host.ip4) /
2616 TCP(sport=12345, dport=80))
2617 self.pg9.add_stream(p)
2618 self.pg_enable_capture(self.pg_interfaces)
2620 capture = self.pg9.get_capture(1)
2625 self.assertEqual(ip.src, self.nat_addr)
2626 self.assertEqual(ip.dst, remote_host.ip4)
2627 self.assertNotEqual(tcp.sport, 12345)
2628 external_port = tcp.sport
2629 self.assertEqual(tcp.dport, 80)
2630 self.check_tcp_checksum(p)
2631 self.check_ip_checksum(p)
2633 self.logger.error(ppp("Unexpected or invalid packet:", p))
2637 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2638 IP(src=remote_host.ip4, dst=self.nat_addr) /
2639 TCP(sport=80, dport=external_port))
2640 self.pg9.add_stream(p)
2641 self.pg_enable_capture(self.pg_interfaces)
2643 capture = self.pg9.get_capture(1)
2648 self.assertEqual(ip.src, remote_host.ip4)
2649 self.assertEqual(ip.dst, local_host.ip4)
2650 self.assertEqual(tcp.sport, 80)
2651 self.assertEqual(tcp.dport, 12345)
2652 self.check_tcp_checksum(p)
2653 self.check_ip_checksum(p)
2655 self.logger.error(ppp("Unexpected or invalid packet:", p))
2658 def test_del_session(self):
2659 """ Delete NAT44 session """
2660 self.nat44_add_address(self.nat_addr)
2661 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2662 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2665 pkts = self.create_stream_in(self.pg0, self.pg1)
2666 self.pg0.add_stream(pkts)
2667 self.pg_enable_capture(self.pg_interfaces)
2669 capture = self.pg1.get_capture(len(pkts))
2671 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2672 nsessions = len(sessions)
2674 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
2675 sessions[0].inside_port,
2676 sessions[0].protocol)
2677 self.vapi.nat44_del_session(sessions[1].outside_ip_address,
2678 sessions[1].outside_port,
2679 sessions[1].protocol,
2682 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2683 self.assertEqual(nsessions - len(sessions), 2)
2685 def test_set_get_reass(self):
2686 """ NAT44 set/get virtual fragmentation reassembly """
2687 reas_cfg1 = self.vapi.nat_get_reass()
2689 self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
2690 max_reass=reas_cfg1.ip4_max_reass * 2,
2691 max_frag=reas_cfg1.ip4_max_frag * 2)
2693 reas_cfg2 = self.vapi.nat_get_reass()
2695 self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
2696 self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
2697 self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
2699 self.vapi.nat_set_reass(drop_frag=1)
2700 self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
2702 def test_frag_in_order(self):
2703 """ NAT44 translate fragments arriving in order """
2704 self.nat44_add_address(self.nat_addr)
2705 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2706 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2709 data = "A" * 4 + "B" * 16 + "C" * 3
2710 self.tcp_port_in = random.randint(1025, 65535)
2712 reass = self.vapi.nat_reass_dump()
2713 reass_n_start = len(reass)
2716 pkts = self.create_stream_frag(self.pg0,
2717 self.pg1.remote_ip4,
2721 self.pg0.add_stream(pkts)
2722 self.pg_enable_capture(self.pg_interfaces)
2724 frags = self.pg1.get_capture(len(pkts))
2725 p = self.reass_frags_and_verify(frags,
2727 self.pg1.remote_ip4)
2728 self.assertEqual(p[TCP].dport, 20)
2729 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
2730 self.tcp_port_out = p[TCP].sport
2731 self.assertEqual(data, p[Raw].load)
2734 pkts = self.create_stream_frag(self.pg1,
2739 self.pg1.add_stream(pkts)
2740 self.pg_enable_capture(self.pg_interfaces)
2742 frags = self.pg0.get_capture(len(pkts))
2743 p = self.reass_frags_and_verify(frags,
2744 self.pg1.remote_ip4,
2745 self.pg0.remote_ip4)
2746 self.assertEqual(p[TCP].sport, 20)
2747 self.assertEqual(p[TCP].dport, self.tcp_port_in)
2748 self.assertEqual(data, p[Raw].load)
2750 reass = self.vapi.nat_reass_dump()
2751 reass_n_end = len(reass)
2753 self.assertEqual(reass_n_end - reass_n_start, 2)
2755 def test_reass_hairpinning(self):
2756 """ NAT44 fragments hairpinning """
2757 host = self.pg0.remote_hosts[0]
2758 server = self.pg0.remote_hosts[1]
2759 host_in_port = random.randint(1025, 65535)
2761 server_in_port = random.randint(1025, 65535)
2762 server_out_port = random.randint(1025, 65535)
2763 data = "A" * 4 + "B" * 16 + "C" * 3
2765 self.nat44_add_address(self.nat_addr)
2766 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2767 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2769 # add static mapping for server
2770 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2771 server_in_port, server_out_port,
2772 proto=IP_PROTOS.tcp)
2774 # send packet from host to server
2775 pkts = self.create_stream_frag(self.pg0,
2780 self.pg0.add_stream(pkts)
2781 self.pg_enable_capture(self.pg_interfaces)
2783 frags = self.pg0.get_capture(len(pkts))
2784 p = self.reass_frags_and_verify(frags,
2787 self.assertNotEqual(p[TCP].sport, host_in_port)
2788 self.assertEqual(p[TCP].dport, server_in_port)
2789 self.assertEqual(data, p[Raw].load)
2791 def test_frag_out_of_order(self):
2792 """ NAT44 translate fragments arriving out of order """
2793 self.nat44_add_address(self.nat_addr)
2794 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2795 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2798 data = "A" * 4 + "B" * 16 + "C" * 3
2799 random.randint(1025, 65535)
2802 pkts = self.create_stream_frag(self.pg0,
2803 self.pg1.remote_ip4,
2808 self.pg0.add_stream(pkts)
2809 self.pg_enable_capture(self.pg_interfaces)
2811 frags = self.pg1.get_capture(len(pkts))
2812 p = self.reass_frags_and_verify(frags,
2814 self.pg1.remote_ip4)
2815 self.assertEqual(p[TCP].dport, 20)
2816 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
2817 self.tcp_port_out = p[TCP].sport
2818 self.assertEqual(data, p[Raw].load)
2821 pkts = self.create_stream_frag(self.pg1,
2827 self.pg1.add_stream(pkts)
2828 self.pg_enable_capture(self.pg_interfaces)
2830 frags = self.pg0.get_capture(len(pkts))
2831 p = self.reass_frags_and_verify(frags,
2832 self.pg1.remote_ip4,
2833 self.pg0.remote_ip4)
2834 self.assertEqual(p[TCP].sport, 20)
2835 self.assertEqual(p[TCP].dport, self.tcp_port_in)
2836 self.assertEqual(data, p[Raw].load)
2838 def test_port_restricted(self):
2839 """ Port restricted NAT44 (MAP-E CE) """
2840 self.nat44_add_address(self.nat_addr)
2841 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2842 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2844 self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
2845 "psid-offset 6 psid-len 6")
2847 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2848 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2849 TCP(sport=4567, dport=22))
2850 self.pg0.add_stream(p)
2851 self.pg_enable_capture(self.pg_interfaces)
2853 capture = self.pg1.get_capture(1)
2858 self.assertEqual(ip.dst, self.pg1.remote_ip4)
2859 self.assertEqual(ip.src, self.nat_addr)
2860 self.assertEqual(tcp.dport, 22)
2861 self.assertNotEqual(tcp.sport, 4567)
2862 self.assertEqual((tcp.sport >> 6) & 63, 10)
2863 self.check_tcp_checksum(p)
2864 self.check_ip_checksum(p)
2866 self.logger.error(ppp("Unexpected or invalid packet:", p))
2869 def test_twice_nat(self):
2871 twice_nat_addr = '10.0.1.3'
2876 self.nat44_add_address(self.nat_addr)
2877 self.nat44_add_address(twice_nat_addr, twice_nat=1)
2878 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
2879 port_in, port_out, proto=IP_PROTOS.tcp,
2881 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2882 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2885 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2886 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2887 TCP(sport=eh_port_out, dport=port_out))
2888 self.pg1.add_stream(p)
2889 self.pg_enable_capture(self.pg_interfaces)
2891 capture = self.pg0.get_capture(1)
2896 self.assertEqual(ip.dst, self.pg0.remote_ip4)
2897 self.assertEqual(ip.src, twice_nat_addr)
2898 self.assertEqual(tcp.dport, port_in)
2899 self.assertNotEqual(tcp.sport, eh_port_out)
2900 eh_port_in = tcp.sport
2901 self.check_tcp_checksum(p)
2902 self.check_ip_checksum(p)
2904 self.logger.error(ppp("Unexpected or invalid packet:", p))
2907 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2908 IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) /
2909 TCP(sport=port_in, dport=eh_port_in))
2910 self.pg0.add_stream(p)
2911 self.pg_enable_capture(self.pg_interfaces)
2913 capture = self.pg1.get_capture(1)
2918 self.assertEqual(ip.dst, self.pg1.remote_ip4)
2919 self.assertEqual(ip.src, self.nat_addr)
2920 self.assertEqual(tcp.dport, eh_port_out)
2921 self.assertEqual(tcp.sport, port_out)
2922 self.check_tcp_checksum(p)
2923 self.check_ip_checksum(p)
2925 self.logger.error(ppp("Unexpected or invalid packet:", p))
2928 def test_twice_nat_lb(self):
2929 """ Twice NAT44 local service load balancing """
2930 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
2931 twice_nat_addr = '10.0.1.3'
2936 server1 = self.pg0.remote_hosts[0]
2937 server2 = self.pg0.remote_hosts[1]
2939 locals = [{'addr': server1.ip4n,
2942 {'addr': server2.ip4n,
2946 self.nat44_add_address(self.nat_addr)
2947 self.nat44_add_address(twice_nat_addr, twice_nat=1)
2949 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
2953 local_num=len(locals),
2955 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2956 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2959 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2960 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2961 TCP(sport=eh_port_out, dport=external_port))
2962 self.pg1.add_stream(p)
2963 self.pg_enable_capture(self.pg_interfaces)
2965 capture = self.pg0.get_capture(1)
2971 self.assertEqual(ip.src, twice_nat_addr)
2972 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
2973 if ip.dst == server1.ip4:
2977 self.assertNotEqual(tcp.sport, eh_port_out)
2978 eh_port_in = tcp.sport
2979 self.assertEqual(tcp.dport, local_port)
2980 self.check_tcp_checksum(p)
2981 self.check_ip_checksum(p)
2983 self.logger.error(ppp("Unexpected or invalid packet:", p))
2986 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2987 IP(src=server.ip4, dst=twice_nat_addr) /
2988 TCP(sport=local_port, dport=eh_port_in))
2989 self.pg0.add_stream(p)
2990 self.pg_enable_capture(self.pg_interfaces)
2992 capture = self.pg1.get_capture(1)
2997 self.assertEqual(ip.src, self.nat_addr)
2998 self.assertEqual(ip.dst, self.pg1.remote_ip4)
2999 self.assertEqual(tcp.sport, external_port)
3000 self.assertEqual(tcp.dport, eh_port_out)
3001 self.check_tcp_checksum(p)
3002 self.check_ip_checksum(p)
3004 self.logger.error(ppp("Unexpected or invalid packet:", p))
3007 def test_twice_nat_interface_addr(self):
3008 """ Acquire twice NAT44 addresses from interface """
3009 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index, twice_nat=1)
3011 # no address in NAT pool
3012 adresses = self.vapi.nat44_address_dump()
3013 self.assertEqual(0, len(adresses))
3015 # configure interface address and check NAT address pool
3016 self.pg7.config_ip4()
3017 adresses = self.vapi.nat44_address_dump()
3018 self.assertEqual(1, len(adresses))
3019 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
3020 self.assertEqual(adresses[0].twice_nat, 1)
3022 # remove interface address and check NAT address pool
3023 self.pg7.unconfig_ip4()
3024 adresses = self.vapi.nat44_address_dump()
3025 self.assertEqual(0, len(adresses))
3028 super(TestNAT44, self).tearDown()
3029 if not self.vpp_dead:
3030 self.logger.info(self.vapi.cli("show nat44 verbose"))
3031 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
3032 self.vapi.cli("nat addr-port-assignment-alg default")
3036 class TestDeterministicNAT(MethodHolder):
3037 """ Deterministic NAT Test Cases """
3040 def setUpConstants(cls):
3041 super(TestDeterministicNAT, cls).setUpConstants()
3042 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
3045 def setUpClass(cls):
3046 super(TestDeterministicNAT, cls).setUpClass()
3049 cls.tcp_port_in = 6303
3050 cls.tcp_external_port = 6303
3051 cls.udp_port_in = 6304
3052 cls.udp_external_port = 6304
3053 cls.icmp_id_in = 6305
3054 cls.nat_addr = '10.0.0.3'
3056 cls.create_pg_interfaces(range(3))
3057 cls.interfaces = list(cls.pg_interfaces)
3059 for i in cls.interfaces:
3064 cls.pg0.generate_remote_hosts(2)
3065 cls.pg0.configure_ipv4_neighbors()
3068 super(TestDeterministicNAT, cls).tearDownClass()
3071 def create_stream_in(self, in_if, out_if, ttl=64):
3073 Create packet stream for inside network
3075 :param in_if: Inside interface
3076 :param out_if: Outside interface
3077 :param ttl: TTL of generated packets
3081 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3082 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3083 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
3087 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3088 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3089 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
3093 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3094 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3095 ICMP(id=self.icmp_id_in, type='echo-request'))
3100 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
3102 Create packet stream for outside network
3104 :param out_if: Outside interface
3105 :param dst_ip: Destination IP address (Default use global NAT address)
3106 :param ttl: TTL of generated packets
3109 dst_ip = self.nat_addr
3112 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3113 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3114 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
3118 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3119 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3120 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
3124 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3125 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3126 ICMP(id=self.icmp_external_id, type='echo-reply'))
3131 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
3133 Verify captured packets on outside network
3135 :param capture: Captured packets
3136 :param nat_ip: Translated IP address (Default use global NAT address)
3137 :param same_port: Sorce port number is not translated (Default False)
3138 :param packet_num: Expected number of packets (Default 3)
3141 nat_ip = self.nat_addr
3142 self.assertEqual(packet_num, len(capture))
3143 for packet in capture:
3145 self.assertEqual(packet[IP].src, nat_ip)
3146 if packet.haslayer(TCP):
3147 self.tcp_port_out = packet[TCP].sport
3148 elif packet.haslayer(UDP):
3149 self.udp_port_out = packet[UDP].sport
3151 self.icmp_external_id = packet[ICMP].id
3153 self.logger.error(ppp("Unexpected or invalid packet "
3154 "(outside network):", packet))
3157 def initiate_tcp_session(self, in_if, out_if):
3159 Initiates TCP session
3161 :param in_if: Inside interface
3162 :param out_if: Outside interface
3165 # SYN packet in->out
3166 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
3167 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
3168 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3171 self.pg_enable_capture(self.pg_interfaces)
3173 capture = out_if.get_capture(1)
3175 self.tcp_port_out = p[TCP].sport
3177 # SYN + ACK packet out->in
3178 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
3179 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
3180 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3182 out_if.add_stream(p)
3183 self.pg_enable_capture(self.pg_interfaces)
3185 in_if.get_capture(1)
3187 # ACK packet in->out
3188 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
3189 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
3190 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3193 self.pg_enable_capture(self.pg_interfaces)
3195 out_if.get_capture(1)
3198 self.logger.error("TCP 3 way handshake failed")
3201 def verify_ipfix_max_entries_per_user(self, data):
3203 Verify IPFIX maximum entries per user exceeded event
3205 :param data: Decoded IPFIX data records
3207 self.assertEqual(1, len(data))
3210 self.assertEqual(ord(record[230]), 13)
3211 # natQuotaExceededEvent
3212 self.assertEqual('\x03\x00\x00\x00', record[466])
3214 self.assertEqual(self.pg0.remote_ip4n, record[8])
3216 def test_deterministic_mode(self):
3217 """ NAT plugin run deterministic mode """
3218 in_addr = '172.16.255.0'
3219 out_addr = '172.17.255.50'
3220 in_addr_t = '172.16.255.20'
3221 in_addr_n = socket.inet_aton(in_addr)
3222 out_addr_n = socket.inet_aton(out_addr)
3223 in_addr_t_n = socket.inet_aton(in_addr_t)
3227 nat_config = self.vapi.nat_show_config()
3228 self.assertEqual(1, nat_config.deterministic)
3230 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
3232 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
3233 self.assertEqual(rep1.out_addr[:4], out_addr_n)
3234 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
3235 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
3237 deterministic_mappings = self.vapi.nat_det_map_dump()
3238 self.assertEqual(len(deterministic_mappings), 1)
3239 dsm = deterministic_mappings[0]
3240 self.assertEqual(in_addr_n, dsm.in_addr[:4])
3241 self.assertEqual(in_plen, dsm.in_plen)
3242 self.assertEqual(out_addr_n, dsm.out_addr[:4])
3243 self.assertEqual(out_plen, dsm.out_plen)
3245 self.clear_nat_det()
3246 deterministic_mappings = self.vapi.nat_det_map_dump()
3247 self.assertEqual(len(deterministic_mappings), 0)
3249 def test_set_timeouts(self):
3250 """ Set deterministic NAT timeouts """
3251 timeouts_before = self.vapi.nat_det_get_timeouts()
3253 self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
3254 timeouts_before.tcp_established + 10,
3255 timeouts_before.tcp_transitory + 10,
3256 timeouts_before.icmp + 10)
3258 timeouts_after = self.vapi.nat_det_get_timeouts()
3260 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
3261 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
3262 self.assertNotEqual(timeouts_before.tcp_established,
3263 timeouts_after.tcp_established)
3264 self.assertNotEqual(timeouts_before.tcp_transitory,
3265 timeouts_after.tcp_transitory)
3267 def test_det_in(self):
3268 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
3270 nat_ip = "10.0.0.10"
3272 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3274 socket.inet_aton(nat_ip),
3276 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3277 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3281 pkts = self.create_stream_in(self.pg0, self.pg1)
3282 self.pg0.add_stream(pkts)
3283 self.pg_enable_capture(self.pg_interfaces)
3285 capture = self.pg1.get_capture(len(pkts))
3286 self.verify_capture_out(capture, nat_ip)
3289 pkts = self.create_stream_out(self.pg1, nat_ip)
3290 self.pg1.add_stream(pkts)
3291 self.pg_enable_capture(self.pg_interfaces)
3293 capture = self.pg0.get_capture(len(pkts))
3294 self.verify_capture_in(capture, self.pg0)
3297 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
3298 self.assertEqual(len(sessions), 3)
3302 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3303 self.assertEqual(s.in_port, self.tcp_port_in)
3304 self.assertEqual(s.out_port, self.tcp_port_out)
3305 self.assertEqual(s.ext_port, self.tcp_external_port)
3309 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3310 self.assertEqual(s.in_port, self.udp_port_in)
3311 self.assertEqual(s.out_port, self.udp_port_out)
3312 self.assertEqual(s.ext_port, self.udp_external_port)
3316 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3317 self.assertEqual(s.in_port, self.icmp_id_in)
3318 self.assertEqual(s.out_port, self.icmp_external_id)
3320 def test_multiple_users(self):
3321 """ Deterministic NAT multiple users """
3323 nat_ip = "10.0.0.10"
3325 external_port = 6303
3327 host0 = self.pg0.remote_hosts[0]
3328 host1 = self.pg0.remote_hosts[1]
3330 self.vapi.nat_det_add_del_map(host0.ip4n,
3332 socket.inet_aton(nat_ip),
3334 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3335 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3339 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
3340 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
3341 TCP(sport=port_in, dport=external_port))
3342 self.pg0.add_stream(p)
3343 self.pg_enable_capture(self.pg_interfaces)
3345 capture = self.pg1.get_capture(1)
3350 self.assertEqual(ip.src, nat_ip)
3351 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3352 self.assertEqual(tcp.dport, external_port)
3353 port_out0 = tcp.sport
3355 self.logger.error(ppp("Unexpected or invalid packet:", p))
3359 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
3360 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
3361 TCP(sport=port_in, dport=external_port))
3362 self.pg0.add_stream(p)
3363 self.pg_enable_capture(self.pg_interfaces)
3365 capture = self.pg1.get_capture(1)
3370 self.assertEqual(ip.src, nat_ip)
3371 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3372 self.assertEqual(tcp.dport, external_port)
3373 port_out1 = tcp.sport
3375 self.logger.error(ppp("Unexpected or invalid packet:", p))
3378 dms = self.vapi.nat_det_map_dump()
3379 self.assertEqual(1, len(dms))
3380 self.assertEqual(2, dms[0].ses_num)
3383 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3384 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
3385 TCP(sport=external_port, dport=port_out0))
3386 self.pg1.add_stream(p)
3387 self.pg_enable_capture(self.pg_interfaces)
3389 capture = self.pg0.get_capture(1)
3394 self.assertEqual(ip.src, self.pg1.remote_ip4)
3395 self.assertEqual(ip.dst, host0.ip4)
3396 self.assertEqual(tcp.dport, port_in)
3397 self.assertEqual(tcp.sport, external_port)
3399 self.logger.error(ppp("Unexpected or invalid packet:", p))
3403 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3404 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
3405 TCP(sport=external_port, dport=port_out1))
3406 self.pg1.add_stream(p)
3407 self.pg_enable_capture(self.pg_interfaces)
3409 capture = self.pg0.get_capture(1)
3414 self.assertEqual(ip.src, self.pg1.remote_ip4)
3415 self.assertEqual(ip.dst, host1.ip4)
3416 self.assertEqual(tcp.dport, port_in)
3417 self.assertEqual(tcp.sport, external_port)
3419 self.logger.error(ppp("Unexpected or invalid packet", p))
3422 # session close api test
3423 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
3425 self.pg1.remote_ip4n,
3427 dms = self.vapi.nat_det_map_dump()
3428 self.assertEqual(dms[0].ses_num, 1)
3430 self.vapi.nat_det_close_session_in(host0.ip4n,
3432 self.pg1.remote_ip4n,
3434 dms = self.vapi.nat_det_map_dump()
3435 self.assertEqual(dms[0].ses_num, 0)
3437 def test_tcp_session_close_detection_in(self):
3438 """ Deterministic NAT TCP session close from inside network """
3439 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3441 socket.inet_aton(self.nat_addr),
3443 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3444 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3447 self.initiate_tcp_session(self.pg0, self.pg1)
3449 # close the session from inside
3451 # FIN packet in -> out
3452 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3453 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3454 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3456 self.pg0.add_stream(p)
3457 self.pg_enable_capture(self.pg_interfaces)
3459 self.pg1.get_capture(1)
3463 # ACK packet out -> in
3464 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3465 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3466 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3470 # FIN packet out -> in
3471 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3472 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3473 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3477 self.pg1.add_stream(pkts)
3478 self.pg_enable_capture(self.pg_interfaces)
3480 self.pg0.get_capture(2)
3482 # ACK packet in -> out
3483 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3484 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3485 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3487 self.pg0.add_stream(p)
3488 self.pg_enable_capture(self.pg_interfaces)
3490 self.pg1.get_capture(1)
3492 # Check if deterministic NAT44 closed the session
3493 dms = self.vapi.nat_det_map_dump()
3494 self.assertEqual(0, dms[0].ses_num)
3496 self.logger.error("TCP session termination failed")
3499 def test_tcp_session_close_detection_out(self):
3500 """ Deterministic NAT TCP session close from outside network """
3501 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3503 socket.inet_aton(self.nat_addr),
3505 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3506 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3509 self.initiate_tcp_session(self.pg0, self.pg1)
3511 # close the session from outside
3513 # FIN packet out -> in
3514 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3515 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3516 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3518 self.pg1.add_stream(p)
3519 self.pg_enable_capture(self.pg_interfaces)
3521 self.pg0.get_capture(1)
3525 # ACK packet in -> out
3526 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3527 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3528 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3532 # ACK packet in -> out
3533 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3534 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3535 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3539 self.pg0.add_stream(pkts)
3540 self.pg_enable_capture(self.pg_interfaces)
3542 self.pg1.get_capture(2)
3544 # ACK packet out -> in
3545 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3546 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3547 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3549 self.pg1.add_stream(p)
3550 self.pg_enable_capture(self.pg_interfaces)
3552 self.pg0.get_capture(1)
3554 # Check if deterministic NAT44 closed the session
3555 dms = self.vapi.nat_det_map_dump()
3556 self.assertEqual(0, dms[0].ses_num)
3558 self.logger.error("TCP session termination failed")
3561 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3562 def test_session_timeout(self):
3563 """ Deterministic NAT session timeouts """
3564 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3566 socket.inet_aton(self.nat_addr),
3568 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3569 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3572 self.initiate_tcp_session(self.pg0, self.pg1)
3573 self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
3574 pkts = self.create_stream_in(self.pg0, self.pg1)
3575 self.pg0.add_stream(pkts)
3576 self.pg_enable_capture(self.pg_interfaces)
3578 capture = self.pg1.get_capture(len(pkts))
3581 dms = self.vapi.nat_det_map_dump()
3582 self.assertEqual(0, dms[0].ses_num)
3584 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3585 def test_session_limit_per_user(self):
3586 """ Deterministic NAT maximum sessions per user limit """
3587 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3589 socket.inet_aton(self.nat_addr),
3591 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3592 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3594 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
3595 src_address=self.pg2.local_ip4n,
3597 template_interval=10)
3598 self.vapi.nat_ipfix()
3601 for port in range(1025, 2025):
3602 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3603 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3604 UDP(sport=port, dport=port))
3607 self.pg0.add_stream(pkts)
3608 self.pg_enable_capture(self.pg_interfaces)
3610 capture = self.pg1.get_capture(len(pkts))
3612 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3613 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3614 UDP(sport=3001, dport=3002))
3615 self.pg0.add_stream(p)
3616 self.pg_enable_capture(self.pg_interfaces)
3618 capture = self.pg1.assert_nothing_captured()
3620 # verify ICMP error packet
3621 capture = self.pg0.get_capture(1)
3623 self.assertTrue(p.haslayer(ICMP))
3625 self.assertEqual(icmp.type, 3)
3626 self.assertEqual(icmp.code, 1)
3627 self.assertTrue(icmp.haslayer(IPerror))
3628 inner_ip = icmp[IPerror]
3629 self.assertEqual(inner_ip[UDPerror].sport, 3001)
3630 self.assertEqual(inner_ip[UDPerror].dport, 3002)
3632 dms = self.vapi.nat_det_map_dump()
3634 self.assertEqual(1000, dms[0].ses_num)
3636 # verify IPFIX logging
3637 self.vapi.cli("ipfix flush") # FIXME this should be an API call
3639 capture = self.pg2.get_capture(2)
3640 ipfix = IPFIXDecoder()
3641 # first load template
3643 self.assertTrue(p.haslayer(IPFIX))
3644 if p.haslayer(Template):
3645 ipfix.add_template(p.getlayer(Template))
3646 # verify events in data set
3648 if p.haslayer(Data):
3649 data = ipfix.decode_data_set(p.getlayer(Set))
3650 self.verify_ipfix_max_entries_per_user(data)
3652 def clear_nat_det(self):
3654 Clear deterministic NAT configuration.
3656 self.vapi.nat_ipfix(enable=0)
3657 self.vapi.nat_det_set_timeouts()
3658 deterministic_mappings = self.vapi.nat_det_map_dump()
3659 for dsm in deterministic_mappings:
3660 self.vapi.nat_det_add_del_map(dsm.in_addr,
3666 interfaces = self.vapi.nat44_interface_dump()
3667 for intf in interfaces:
3668 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
3673 super(TestDeterministicNAT, self).tearDown()
3674 if not self.vpp_dead:
3675 self.logger.info(self.vapi.cli("show nat44 detail"))
3676 self.clear_nat_det()
3679 class TestNAT64(MethodHolder):
3680 """ NAT64 Test Cases """
3683 def setUpClass(cls):
3684 super(TestNAT64, cls).setUpClass()
3687 cls.tcp_port_in = 6303
3688 cls.tcp_port_out = 6303
3689 cls.udp_port_in = 6304
3690 cls.udp_port_out = 6304
3691 cls.icmp_id_in = 6305
3692 cls.icmp_id_out = 6305
3693 cls.nat_addr = '10.0.0.3'
3694 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3696 cls.vrf1_nat_addr = '10.0.10.3'
3697 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
3700 cls.create_pg_interfaces(range(5))
3701 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
3702 cls.ip6_interfaces.append(cls.pg_interfaces[2])
3703 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
3705 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
3707 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
3709 cls.pg0.generate_remote_hosts(2)
3711 for i in cls.ip6_interfaces:
3714 i.configure_ipv6_neighbors()
3716 for i in cls.ip4_interfaces:
3722 cls.pg3.config_ip4()
3723 cls.pg3.resolve_arp()
3724 cls.pg3.config_ip6()
3725 cls.pg3.configure_ipv6_neighbors()
3728 super(TestNAT64, cls).tearDownClass()
3731 def test_pool(self):
3732 """ Add/delete address to NAT64 pool """
3733 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
3735 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
3737 addresses = self.vapi.nat64_pool_addr_dump()
3738 self.assertEqual(len(addresses), 1)
3739 self.assertEqual(addresses[0].address, nat_addr)
3741 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
3743 addresses = self.vapi.nat64_pool_addr_dump()
3744 self.assertEqual(len(addresses), 0)
3746 def test_interface(self):
3747 """ Enable/disable NAT64 feature on the interface """
3748 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3749 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3751 interfaces = self.vapi.nat64_interface_dump()
3752 self.assertEqual(len(interfaces), 2)
3755 for intf in interfaces:
3756 if intf.sw_if_index == self.pg0.sw_if_index:
3757 self.assertEqual(intf.is_inside, 1)
3759 elif intf.sw_if_index == self.pg1.sw_if_index:
3760 self.assertEqual(intf.is_inside, 0)
3762 self.assertTrue(pg0_found)
3763 self.assertTrue(pg1_found)
3765 features = self.vapi.cli("show interface features pg0")
3766 self.assertNotEqual(features.find('nat64-in2out'), -1)
3767 features = self.vapi.cli("show interface features pg1")
3768 self.assertNotEqual(features.find('nat64-out2in'), -1)
3770 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
3771 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
3773 interfaces = self.vapi.nat64_interface_dump()
3774 self.assertEqual(len(interfaces), 0)
3776 def test_static_bib(self):
3777 """ Add/delete static BIB entry """
3778 in_addr = socket.inet_pton(socket.AF_INET6,
3779 '2001:db8:85a3::8a2e:370:7334')
3780 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
3783 proto = IP_PROTOS.tcp
3785 self.vapi.nat64_add_del_static_bib(in_addr,
3790 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3795 self.assertEqual(bibe.i_addr, in_addr)
3796 self.assertEqual(bibe.o_addr, out_addr)
3797 self.assertEqual(bibe.i_port, in_port)
3798 self.assertEqual(bibe.o_port, out_port)
3799 self.assertEqual(static_bib_num, 1)
3801 self.vapi.nat64_add_del_static_bib(in_addr,
3807 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3812 self.assertEqual(static_bib_num, 0)
3814 def test_set_timeouts(self):
3815 """ Set NAT64 timeouts """
3816 # verify default values
3817 timeouts = self.vapi.nat64_get_timeouts()
3818 self.assertEqual(timeouts.udp, 300)
3819 self.assertEqual(timeouts.icmp, 60)
3820 self.assertEqual(timeouts.tcp_trans, 240)
3821 self.assertEqual(timeouts.tcp_est, 7440)
3822 self.assertEqual(timeouts.tcp_incoming_syn, 6)
3824 # set and verify custom values
3825 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
3826 tcp_est=7450, tcp_incoming_syn=10)
3827 timeouts = self.vapi.nat64_get_timeouts()
3828 self.assertEqual(timeouts.udp, 200)
3829 self.assertEqual(timeouts.icmp, 30)
3830 self.assertEqual(timeouts.tcp_trans, 250)
3831 self.assertEqual(timeouts.tcp_est, 7450)
3832 self.assertEqual(timeouts.tcp_incoming_syn, 10)
3834 def test_dynamic(self):
3835 """ NAT64 dynamic translation test """
3836 self.tcp_port_in = 6303
3837 self.udp_port_in = 6304
3838 self.icmp_id_in = 6305
3840 ses_num_start = self.nat64_get_ses_num()
3842 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3844 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3845 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3848 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3849 self.pg0.add_stream(pkts)
3850 self.pg_enable_capture(self.pg_interfaces)
3852 capture = self.pg1.get_capture(len(pkts))
3853 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3854 dst_ip=self.pg1.remote_ip4)
3857 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3858 self.pg1.add_stream(pkts)
3859 self.pg_enable_capture(self.pg_interfaces)
3861 capture = self.pg0.get_capture(len(pkts))
3862 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3863 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3866 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3867 self.pg0.add_stream(pkts)
3868 self.pg_enable_capture(self.pg_interfaces)
3870 capture = self.pg1.get_capture(len(pkts))
3871 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3872 dst_ip=self.pg1.remote_ip4)
3875 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3876 self.pg1.add_stream(pkts)
3877 self.pg_enable_capture(self.pg_interfaces)
3879 capture = self.pg0.get_capture(len(pkts))
3880 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3882 ses_num_end = self.nat64_get_ses_num()
3884 self.assertEqual(ses_num_end - ses_num_start, 3)
3886 # tenant with specific VRF
3887 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3888 self.vrf1_nat_addr_n,
3889 vrf_id=self.vrf1_id)
3890 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3892 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
3893 self.pg2.add_stream(pkts)
3894 self.pg_enable_capture(self.pg_interfaces)
3896 capture = self.pg1.get_capture(len(pkts))
3897 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3898 dst_ip=self.pg1.remote_ip4)
3900 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3901 self.pg1.add_stream(pkts)
3902 self.pg_enable_capture(self.pg_interfaces)
3904 capture = self.pg2.get_capture(len(pkts))
3905 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
3907 def test_static(self):
3908 """ NAT64 static translation test """
3909 self.tcp_port_in = 60303
3910 self.udp_port_in = 60304
3911 self.icmp_id_in = 60305
3912 self.tcp_port_out = 60303
3913 self.udp_port_out = 60304
3914 self.icmp_id_out = 60305
3916 ses_num_start = self.nat64_get_ses_num()
3918 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3920 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3921 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3923 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3928 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3933 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3940 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3941 self.pg0.add_stream(pkts)
3942 self.pg_enable_capture(self.pg_interfaces)
3944 capture = self.pg1.get_capture(len(pkts))
3945 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3946 dst_ip=self.pg1.remote_ip4, same_port=True)
3949 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3950 self.pg1.add_stream(pkts)
3951 self.pg_enable_capture(self.pg_interfaces)
3953 capture = self.pg0.get_capture(len(pkts))
3954 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3955 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3957 ses_num_end = self.nat64_get_ses_num()
3959 self.assertEqual(ses_num_end - ses_num_start, 3)
3961 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3962 def test_session_timeout(self):
3963 """ NAT64 session timeout """
3964 self.icmp_id_in = 1234
3965 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3967 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3968 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3969 self.vapi.nat64_set_timeouts(icmp=5)
3971 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3972 self.pg0.add_stream(pkts)
3973 self.pg_enable_capture(self.pg_interfaces)
3975 capture = self.pg1.get_capture(len(pkts))
3977 ses_num_before_timeout = self.nat64_get_ses_num()
3981 # ICMP session after timeout
3982 ses_num_after_timeout = self.nat64_get_ses_num()
3983 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
3985 def test_icmp_error(self):
3986 """ NAT64 ICMP Error message translation """
3987 self.tcp_port_in = 6303
3988 self.udp_port_in = 6304
3989 self.icmp_id_in = 6305
3991 ses_num_start = self.nat64_get_ses_num()
3993 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3995 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3996 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3998 # send some packets to create sessions
3999 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4000 self.pg0.add_stream(pkts)
4001 self.pg_enable_capture(self.pg_interfaces)
4003 capture_ip4 = self.pg1.get_capture(len(pkts))
4004 self.verify_capture_out(capture_ip4,
4005 nat_ip=self.nat_addr,
4006 dst_ip=self.pg1.remote_ip4)
4008 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4009 self.pg1.add_stream(pkts)
4010 self.pg_enable_capture(self.pg_interfaces)
4012 capture_ip6 = self.pg0.get_capture(len(pkts))
4013 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4014 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
4015 self.pg0.remote_ip6)
4018 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4019 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
4020 ICMPv6DestUnreach(code=1) /
4021 packet[IPv6] for packet in capture_ip6]
4022 self.pg0.add_stream(pkts)
4023 self.pg_enable_capture(self.pg_interfaces)
4025 capture = self.pg1.get_capture(len(pkts))
4026 for packet in capture:
4028 self.assertEqual(packet[IP].src, self.nat_addr)
4029 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
4030 self.assertEqual(packet[ICMP].type, 3)
4031 self.assertEqual(packet[ICMP].code, 13)
4032 inner = packet[IPerror]
4033 self.assertEqual(inner.src, self.pg1.remote_ip4)
4034 self.assertEqual(inner.dst, self.nat_addr)
4035 self.check_icmp_checksum(packet)
4036 if inner.haslayer(TCPerror):
4037 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
4038 elif inner.haslayer(UDPerror):
4039 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
4041 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
4043 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4047 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4048 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4049 ICMP(type=3, code=13) /
4050 packet[IP] for packet in capture_ip4]
4051 self.pg1.add_stream(pkts)
4052 self.pg_enable_capture(self.pg_interfaces)
4054 capture = self.pg0.get_capture(len(pkts))
4055 for packet in capture:
4057 self.assertEqual(packet[IPv6].src, ip.src)
4058 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
4059 icmp = packet[ICMPv6DestUnreach]
4060 self.assertEqual(icmp.code, 1)
4061 inner = icmp[IPerror6]
4062 self.assertEqual(inner.src, self.pg0.remote_ip6)
4063 self.assertEqual(inner.dst, ip.src)
4064 self.check_icmpv6_checksum(packet)
4065 if inner.haslayer(TCPerror):
4066 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
4067 elif inner.haslayer(UDPerror):
4068 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
4070 self.assertEqual(inner[ICMPv6EchoRequest].id,
4073 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4076 def test_hairpinning(self):
4077 """ NAT64 hairpinning """
4079 client = self.pg0.remote_hosts[0]
4080 server = self.pg0.remote_hosts[1]
4081 server_tcp_in_port = 22
4082 server_tcp_out_port = 4022
4083 server_udp_in_port = 23
4084 server_udp_out_port = 4023
4085 client_tcp_in_port = 1234
4086 client_udp_in_port = 1235
4087 client_tcp_out_port = 0
4088 client_udp_out_port = 0
4089 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
4090 nat_addr_ip6 = ip.src
4092 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4094 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4095 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4097 self.vapi.nat64_add_del_static_bib(server.ip6n,
4100 server_tcp_out_port,
4102 self.vapi.nat64_add_del_static_bib(server.ip6n,
4105 server_udp_out_port,
4110 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4111 IPv6(src=client.ip6, dst=nat_addr_ip6) /
4112 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
4114 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4115 IPv6(src=client.ip6, dst=nat_addr_ip6) /
4116 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
4118 self.pg0.add_stream(pkts)
4119 self.pg_enable_capture(self.pg_interfaces)
4121 capture = self.pg0.get_capture(len(pkts))
4122 for packet in capture:
4124 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4125 self.assertEqual(packet[IPv6].dst, server.ip6)
4126 if packet.haslayer(TCP):
4127 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
4128 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
4129 self.check_tcp_checksum(packet)
4130 client_tcp_out_port = packet[TCP].sport
4132 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
4133 self.assertEqual(packet[UDP].dport, server_udp_in_port)
4134 self.check_udp_checksum(packet)
4135 client_udp_out_port = packet[UDP].sport
4137 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4142 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4143 IPv6(src=server.ip6, dst=nat_addr_ip6) /
4144 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
4146 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4147 IPv6(src=server.ip6, dst=nat_addr_ip6) /
4148 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
4150 self.pg0.add_stream(pkts)
4151 self.pg_enable_capture(self.pg_interfaces)
4153 capture = self.pg0.get_capture(len(pkts))
4154 for packet in capture:
4156 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4157 self.assertEqual(packet[IPv6].dst, client.ip6)
4158 if packet.haslayer(TCP):
4159 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
4160 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
4161 self.check_tcp_checksum(packet)
4163 self.assertEqual(packet[UDP].sport, server_udp_out_port)
4164 self.assertEqual(packet[UDP].dport, client_udp_in_port)
4165 self.check_udp_checksum(packet)
4167 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4172 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4173 IPv6(src=client.ip6, dst=nat_addr_ip6) /
4174 ICMPv6DestUnreach(code=1) /
4175 packet[IPv6] for packet in capture]
4176 self.pg0.add_stream(pkts)
4177 self.pg_enable_capture(self.pg_interfaces)
4179 capture = self.pg0.get_capture(len(pkts))
4180 for packet in capture:
4182 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4183 self.assertEqual(packet[IPv6].dst, server.ip6)
4184 icmp = packet[ICMPv6DestUnreach]
4185 self.assertEqual(icmp.code, 1)
4186 inner = icmp[IPerror6]
4187 self.assertEqual(inner.src, server.ip6)
4188 self.assertEqual(inner.dst, nat_addr_ip6)
4189 self.check_icmpv6_checksum(packet)
4190 if inner.haslayer(TCPerror):
4191 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
4192 self.assertEqual(inner[TCPerror].dport,
4193 client_tcp_out_port)
4195 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
4196 self.assertEqual(inner[UDPerror].dport,
4197 client_udp_out_port)
4199 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4202 def test_prefix(self):
4203 """ NAT64 Network-Specific Prefix """
4205 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4207 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4208 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4209 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
4210 self.vrf1_nat_addr_n,
4211 vrf_id=self.vrf1_id)
4212 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
4215 global_pref64 = "2001:db8::"
4216 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
4217 global_pref64_len = 32
4218 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
4220 prefix = self.vapi.nat64_prefix_dump()
4221 self.assertEqual(len(prefix), 1)
4222 self.assertEqual(prefix[0].prefix, global_pref64_n)
4223 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
4224 self.assertEqual(prefix[0].vrf_id, 0)
4226 # Add tenant specific prefix
4227 vrf1_pref64 = "2001:db8:122:300::"
4228 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
4229 vrf1_pref64_len = 56
4230 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
4232 vrf_id=self.vrf1_id)
4233 prefix = self.vapi.nat64_prefix_dump()
4234 self.assertEqual(len(prefix), 2)
4237 pkts = self.create_stream_in_ip6(self.pg0,
4240 plen=global_pref64_len)
4241 self.pg0.add_stream(pkts)
4242 self.pg_enable_capture(self.pg_interfaces)
4244 capture = self.pg1.get_capture(len(pkts))
4245 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4246 dst_ip=self.pg1.remote_ip4)
4248 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4249 self.pg1.add_stream(pkts)
4250 self.pg_enable_capture(self.pg_interfaces)
4252 capture = self.pg0.get_capture(len(pkts))
4253 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
4256 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
4258 # Tenant specific prefix
4259 pkts = self.create_stream_in_ip6(self.pg2,
4262 plen=vrf1_pref64_len)
4263 self.pg2.add_stream(pkts)
4264 self.pg_enable_capture(self.pg_interfaces)
4266 capture = self.pg1.get_capture(len(pkts))
4267 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
4268 dst_ip=self.pg1.remote_ip4)
4270 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
4271 self.pg1.add_stream(pkts)
4272 self.pg_enable_capture(self.pg_interfaces)
4274 capture = self.pg2.get_capture(len(pkts))
4275 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
4278 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
4280 def test_unknown_proto(self):
4281 """ NAT64 translate packet with unknown protocol """
4283 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4285 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4286 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4287 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4290 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4291 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
4292 TCP(sport=self.tcp_port_in, dport=20))
4293 self.pg0.add_stream(p)
4294 self.pg_enable_capture(self.pg_interfaces)
4296 p = self.pg1.get_capture(1)
4298 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4299 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
4301 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
4302 TCP(sport=1234, dport=1234))
4303 self.pg0.add_stream(p)
4304 self.pg_enable_capture(self.pg_interfaces)
4306 p = self.pg1.get_capture(1)
4309 self.assertEqual(packet[IP].src, self.nat_addr)
4310 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
4311 self.assertTrue(packet.haslayer(GRE))
4312 self.check_ip_checksum(packet)
4314 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4318 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4319 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4321 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
4322 TCP(sport=1234, dport=1234))
4323 self.pg1.add_stream(p)
4324 self.pg_enable_capture(self.pg_interfaces)
4326 p = self.pg0.get_capture(1)
4329 self.assertEqual(packet[IPv6].src, remote_ip6)
4330 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
4331 self.assertEqual(packet[IPv6].nh, 47)
4333 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4336 def test_hairpinning_unknown_proto(self):
4337 """ NAT64 translate packet with unknown protocol - hairpinning """
4339 client = self.pg0.remote_hosts[0]
4340 server = self.pg0.remote_hosts[1]
4341 server_tcp_in_port = 22
4342 server_tcp_out_port = 4022
4343 client_tcp_in_port = 1234
4344 client_tcp_out_port = 1235
4345 server_nat_ip = "10.0.0.100"
4346 client_nat_ip = "10.0.0.110"
4347 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
4348 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
4349 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
4350 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
4352 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
4354 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4355 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4357 self.vapi.nat64_add_del_static_bib(server.ip6n,
4360 server_tcp_out_port,
4363 self.vapi.nat64_add_del_static_bib(server.ip6n,
4369 self.vapi.nat64_add_del_static_bib(client.ip6n,
4372 client_tcp_out_port,
4376 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4377 IPv6(src=client.ip6, dst=server_nat_ip6) /
4378 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
4379 self.pg0.add_stream(p)
4380 self.pg_enable_capture(self.pg_interfaces)
4382 p = self.pg0.get_capture(1)
4384 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4385 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
4387 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
4388 TCP(sport=1234, dport=1234))
4389 self.pg0.add_stream(p)
4390 self.pg_enable_capture(self.pg_interfaces)
4392 p = self.pg0.get_capture(1)
4395 self.assertEqual(packet[IPv6].src, client_nat_ip6)
4396 self.assertEqual(packet[IPv6].dst, server.ip6)
4397 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
4399 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4403 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4404 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
4406 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
4407 TCP(sport=1234, dport=1234))
4408 self.pg0.add_stream(p)
4409 self.pg_enable_capture(self.pg_interfaces)
4411 p = self.pg0.get_capture(1)
4414 self.assertEqual(packet[IPv6].src, server_nat_ip6)
4415 self.assertEqual(packet[IPv6].dst, client.ip6)
4416 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
4418 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4421 def test_one_armed_nat64(self):
4422 """ One armed NAT64 """
4424 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
4428 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4430 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
4431 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
4434 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
4435 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
4436 TCP(sport=12345, dport=80))
4437 self.pg3.add_stream(p)
4438 self.pg_enable_capture(self.pg_interfaces)
4440 capture = self.pg3.get_capture(1)
4445 self.assertEqual(ip.src, self.nat_addr)
4446 self.assertEqual(ip.dst, self.pg3.remote_ip4)
4447 self.assertNotEqual(tcp.sport, 12345)
4448 external_port = tcp.sport
4449 self.assertEqual(tcp.dport, 80)
4450 self.check_tcp_checksum(p)
4451 self.check_ip_checksum(p)
4453 self.logger.error(ppp("Unexpected or invalid packet:", p))
4457 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
4458 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
4459 TCP(sport=80, dport=external_port))
4460 self.pg3.add_stream(p)
4461 self.pg_enable_capture(self.pg_interfaces)
4463 capture = self.pg3.get_capture(1)
4468 self.assertEqual(ip.src, remote_host_ip6)
4469 self.assertEqual(ip.dst, self.pg3.remote_ip6)
4470 self.assertEqual(tcp.sport, 80)
4471 self.assertEqual(tcp.dport, 12345)
4472 self.check_tcp_checksum(p)
4474 self.logger.error(ppp("Unexpected or invalid packet:", p))
4477 def test_frag_in_order(self):
4478 """ NAT64 translate fragments arriving in order """
4479 self.tcp_port_in = random.randint(1025, 65535)
4481 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4483 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4484 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4486 reass = self.vapi.nat_reass_dump()
4487 reass_n_start = len(reass)
4491 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
4492 self.tcp_port_in, 20, data)
4493 self.pg0.add_stream(pkts)
4494 self.pg_enable_capture(self.pg_interfaces)
4496 frags = self.pg1.get_capture(len(pkts))
4497 p = self.reass_frags_and_verify(frags,
4499 self.pg1.remote_ip4)
4500 self.assertEqual(p[TCP].dport, 20)
4501 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
4502 self.tcp_port_out = p[TCP].sport
4503 self.assertEqual(data, p[Raw].load)
4506 data = "A" * 4 + "b" * 16 + "C" * 3
4507 pkts = self.create_stream_frag(self.pg1,
4512 self.pg1.add_stream(pkts)
4513 self.pg_enable_capture(self.pg_interfaces)
4515 frags = self.pg0.get_capture(len(pkts))
4516 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4517 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
4518 self.assertEqual(p[TCP].sport, 20)
4519 self.assertEqual(p[TCP].dport, self.tcp_port_in)
4520 self.assertEqual(data, p[Raw].load)
4522 reass = self.vapi.nat_reass_dump()
4523 reass_n_end = len(reass)
4525 self.assertEqual(reass_n_end - reass_n_start, 2)
4527 def test_reass_hairpinning(self):
4528 """ NAT64 fragments hairpinning """
4530 client = self.pg0.remote_hosts[0]
4531 server = self.pg0.remote_hosts[1]
4532 server_in_port = random.randint(1025, 65535)
4533 server_out_port = random.randint(1025, 65535)
4534 client_in_port = random.randint(1025, 65535)
4535 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
4536 nat_addr_ip6 = ip.src
4538 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4540 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4541 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4543 # add static BIB entry for server
4544 self.vapi.nat64_add_del_static_bib(server.ip6n,
4550 # send packet from host to server
4551 pkts = self.create_stream_frag_ip6(self.pg0,
4556 self.pg0.add_stream(pkts)
4557 self.pg_enable_capture(self.pg_interfaces)
4559 frags = self.pg0.get_capture(len(pkts))
4560 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
4561 self.assertNotEqual(p[TCP].sport, client_in_port)
4562 self.assertEqual(p[TCP].dport, server_in_port)
4563 self.assertEqual(data, p[Raw].load)
4565 def test_frag_out_of_order(self):
4566 """ NAT64 translate fragments arriving out of order """
4567 self.tcp_port_in = random.randint(1025, 65535)
4569 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4571 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4572 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4576 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
4577 self.tcp_port_in, 20, data)
4579 self.pg0.add_stream(pkts)
4580 self.pg_enable_capture(self.pg_interfaces)
4582 frags = self.pg1.get_capture(len(pkts))
4583 p = self.reass_frags_and_verify(frags,
4585 self.pg1.remote_ip4)
4586 self.assertEqual(p[TCP].dport, 20)
4587 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
4588 self.tcp_port_out = p[TCP].sport
4589 self.assertEqual(data, p[Raw].load)
4592 data = "A" * 4 + "B" * 16 + "C" * 3
4593 pkts = self.create_stream_frag(self.pg1,
4599 self.pg1.add_stream(pkts)
4600 self.pg_enable_capture(self.pg_interfaces)
4602 frags = self.pg0.get_capture(len(pkts))
4603 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4604 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
4605 self.assertEqual(p[TCP].sport, 20)
4606 self.assertEqual(p[TCP].dport, self.tcp_port_in)
4607 self.assertEqual(data, p[Raw].load)
4609 def test_interface_addr(self):
4610 """ Acquire NAT64 pool addresses from interface """
4611 self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
4613 # no address in NAT64 pool
4614 adresses = self.vapi.nat44_address_dump()
4615 self.assertEqual(0, len(adresses))
4617 # configure interface address and check NAT64 address pool
4618 self.pg4.config_ip4()
4619 addresses = self.vapi.nat64_pool_addr_dump()
4620 self.assertEqual(len(addresses), 1)
4621 self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
4623 # remove interface address and check NAT64 address pool
4624 self.pg4.unconfig_ip4()
4625 addresses = self.vapi.nat64_pool_addr_dump()
4626 self.assertEqual(0, len(adresses))
4628 def nat64_get_ses_num(self):
4630 Return number of active NAT64 sessions.
4632 st = self.vapi.nat64_st_dump()
4635 def clear_nat64(self):
4637 Clear NAT64 configuration.
4639 self.vapi.nat64_set_timeouts()
4641 interfaces = self.vapi.nat64_interface_dump()
4642 for intf in interfaces:
4643 if intf.is_inside > 1:
4644 self.vapi.nat64_add_del_interface(intf.sw_if_index,
4647 self.vapi.nat64_add_del_interface(intf.sw_if_index,
4651 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4654 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
4662 bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
4665 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
4673 bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
4676 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
4684 adresses = self.vapi.nat64_pool_addr_dump()
4685 for addr in adresses:
4686 self.vapi.nat64_add_del_pool_addr_range(addr.address,
4691 prefixes = self.vapi.nat64_prefix_dump()
4692 for prefix in prefixes:
4693 self.vapi.nat64_add_del_prefix(prefix.prefix,
4695 vrf_id=prefix.vrf_id,
4699 super(TestNAT64, self).tearDown()
4700 if not self.vpp_dead:
4701 self.logger.info(self.vapi.cli("show nat64 pool"))
4702 self.logger.info(self.vapi.cli("show nat64 interfaces"))
4703 self.logger.info(self.vapi.cli("show nat64 prefix"))
4704 self.logger.info(self.vapi.cli("show nat64 bib all"))
4705 self.logger.info(self.vapi.cli("show nat64 session table all"))
4706 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
4710 class TestDSlite(MethodHolder):
4711 """ DS-Lite Test Cases """
4714 def setUpClass(cls):
4715 super(TestDSlite, cls).setUpClass()
4718 cls.nat_addr = '10.0.0.3'
4719 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4721 cls.create_pg_interfaces(range(2))
4723 cls.pg0.config_ip4()
4724 cls.pg0.resolve_arp()
4726 cls.pg1.config_ip6()
4727 cls.pg1.generate_remote_hosts(2)
4728 cls.pg1.configure_ipv6_neighbors()
4731 super(TestDSlite, cls).tearDownClass()
4734 def test_dslite(self):
4735 """ Test DS-Lite """
4736 self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
4738 aftr_ip4 = '192.0.0.1'
4739 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
4740 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
4741 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
4742 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
4745 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4746 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
4747 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
4748 UDP(sport=20000, dport=10000))
4749 self.pg1.add_stream(p)
4750 self.pg_enable_capture(self.pg_interfaces)
4752 capture = self.pg0.get_capture(1)
4753 capture = capture[0]
4754 self.assertFalse(capture.haslayer(IPv6))
4755 self.assertEqual(capture[IP].src, self.nat_addr)
4756 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
4757 self.assertNotEqual(capture[UDP].sport, 20000)
4758 self.assertEqual(capture[UDP].dport, 10000)
4759 self.check_ip_checksum(capture)
4760 out_port = capture[UDP].sport
4762 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4763 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
4764 UDP(sport=10000, dport=out_port))
4765 self.pg0.add_stream(p)
4766 self.pg_enable_capture(self.pg_interfaces)
4768 capture = self.pg1.get_capture(1)
4769 capture = capture[0]
4770 self.assertEqual(capture[IPv6].src, aftr_ip6)
4771 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
4772 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
4773 self.assertEqual(capture[IP].dst, '192.168.1.1')
4774 self.assertEqual(capture[UDP].sport, 10000)
4775 self.assertEqual(capture[UDP].dport, 20000)
4776 self.check_ip_checksum(capture)
4779 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4780 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
4781 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
4782 TCP(sport=20001, dport=10001))
4783 self.pg1.add_stream(p)
4784 self.pg_enable_capture(self.pg_interfaces)
4786 capture = self.pg0.get_capture(1)
4787 capture = capture[0]
4788 self.assertFalse(capture.haslayer(IPv6))
4789 self.assertEqual(capture[IP].src, self.nat_addr)
4790 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
4791 self.assertNotEqual(capture[TCP].sport, 20001)
4792 self.assertEqual(capture[TCP].dport, 10001)
4793 self.check_ip_checksum(capture)
4794 self.check_tcp_checksum(capture)
4795 out_port = capture[TCP].sport
4797 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4798 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
4799 TCP(sport=10001, dport=out_port))
4800 self.pg0.add_stream(p)
4801 self.pg_enable_capture(self.pg_interfaces)
4803 capture = self.pg1.get_capture(1)
4804 capture = capture[0]
4805 self.assertEqual(capture[IPv6].src, aftr_ip6)
4806 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
4807 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
4808 self.assertEqual(capture[IP].dst, '192.168.1.1')
4809 self.assertEqual(capture[TCP].sport, 10001)
4810 self.assertEqual(capture[TCP].dport, 20001)
4811 self.check_ip_checksum(capture)
4812 self.check_tcp_checksum(capture)
4815 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4816 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
4817 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
4818 ICMP(id=4000, type='echo-request'))
4819 self.pg1.add_stream(p)
4820 self.pg_enable_capture(self.pg_interfaces)
4822 capture = self.pg0.get_capture(1)
4823 capture = capture[0]
4824 self.assertFalse(capture.haslayer(IPv6))
4825 self.assertEqual(capture[IP].src, self.nat_addr)
4826 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
4827 self.assertNotEqual(capture[ICMP].id, 4000)
4828 self.check_ip_checksum(capture)
4829 self.check_icmp_checksum(capture)
4830 out_id = capture[ICMP].id
4832 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4833 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
4834 ICMP(id=out_id, type='echo-reply'))
4835 self.pg0.add_stream(p)
4836 self.pg_enable_capture(self.pg_interfaces)
4838 capture = self.pg1.get_capture(1)
4839 capture = capture[0]
4840 self.assertEqual(capture[IPv6].src, aftr_ip6)
4841 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
4842 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
4843 self.assertEqual(capture[IP].dst, '192.168.1.1')
4844 self.assertEqual(capture[ICMP].id, 4000)
4845 self.check_ip_checksum(capture)
4846 self.check_icmp_checksum(capture)
4848 # ping DS-Lite AFTR tunnel endpoint address
4849 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4850 IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
4851 ICMPv6EchoRequest())
4852 self.pg1.add_stream(p)
4853 self.pg_enable_capture(self.pg_interfaces)
4855 capture = self.pg1.get_capture(1)
4856 self.assertEqual(1, len(capture))
4857 capture = capture[0]
4858 self.assertEqual(capture[IPv6].src, aftr_ip6)
4859 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
4860 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
4863 super(TestDSlite, self).tearDown()
4864 if not self.vpp_dead:
4865 self.logger.info(self.vapi.cli("show dslite pool"))
4867 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
4868 self.logger.info(self.vapi.cli("show dslite sessions"))
4870 if __name__ == '__main__':
4871 unittest.main(testRunner=VppTestRunner)