7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
10 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
11 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6
12 from scapy.layers.l2 import Ether, ARP
13 from scapy.data import IP_PROTOS
14 from scapy.packet import bind_layers
16 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
17 from time import sleep
20 class MethodHolder(VppTestCase):
21 """ SNAT create capture and verify method holder """
25 super(MethodHolder, cls).setUpClass()
28 super(MethodHolder, self).tearDown()
30 def check_tcp_checksum(self, pkt):
32 Check TCP checksum in IP packet
34 :param pkt: Packet to check TCP checksum
36 new = pkt.__class__(str(pkt))
38 new = new.__class__(str(new))
39 self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
41 def create_stream_in(self, in_if, out_if, ttl=64):
43 Create packet stream for inside network
45 :param in_if: Inside interface
46 :param out_if: Outside interface
47 :param ttl: TTL of generated packets
51 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
52 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
53 TCP(sport=self.tcp_port_in, dport=20))
57 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
58 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
59 UDP(sport=self.udp_port_in, dport=20))
63 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
64 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
65 ICMP(id=self.icmp_id_in, type='echo-request'))
70 def create_stream_in_ip6(self, in_if, out_if, hlim=64):
72 Create IPv6 packet stream for inside network
74 :param in_if: Inside interface
75 :param out_if: Outside interface
76 :param ttl: Hop Limit of generated packets
79 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
81 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
82 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
83 TCP(sport=self.tcp_port_in, dport=20))
87 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
88 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
89 UDP(sport=self.udp_port_in, dport=20))
93 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
94 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
95 ICMPv6EchoRequest(id=self.icmp_id_in))
100 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
102 Create packet stream for outside network
104 :param out_if: Outside interface
105 :param dst_ip: Destination IP address (Default use global SNAT address)
106 :param ttl: TTL of generated packets
109 dst_ip = self.snat_addr
112 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
113 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
114 TCP(dport=self.tcp_port_out, sport=20))
118 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
119 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
120 UDP(dport=self.udp_port_out, sport=20))
124 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
125 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
126 ICMP(id=self.icmp_id_out, type='echo-reply'))
131 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
132 packet_num=3, dst_ip=None):
134 Verify captured packets on outside network
136 :param capture: Captured packets
137 :param nat_ip: Translated IP address (Default use global SNAT address)
138 :param same_port: Sorce port number is not translated (Default False)
139 :param packet_num: Expected number of packets (Default 3)
140 :param dst_ip: Destination IP address (Default do not verify)
143 nat_ip = self.snat_addr
144 self.assertEqual(packet_num, len(capture))
145 for packet in capture:
147 self.assertEqual(packet[IP].src, nat_ip)
148 if dst_ip is not None:
149 self.assertEqual(packet[IP].dst, dst_ip)
150 if packet.haslayer(TCP):
152 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
155 packet[TCP].sport, self.tcp_port_in)
156 self.tcp_port_out = packet[TCP].sport
157 elif packet.haslayer(UDP):
159 self.assertEqual(packet[UDP].sport, self.udp_port_in)
162 packet[UDP].sport, self.udp_port_in)
163 self.udp_port_out = packet[UDP].sport
166 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
168 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
169 self.icmp_id_out = packet[ICMP].id
171 self.logger.error(ppp("Unexpected or invalid packet "
172 "(outside network):", packet))
175 def verify_capture_in(self, capture, in_if, packet_num=3):
177 Verify captured packets on inside network
179 :param capture: Captured packets
180 :param in_if: Inside interface
181 :param packet_num: Expected number of packets (Default 3)
183 self.assertEqual(packet_num, len(capture))
184 for packet in capture:
186 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
187 if packet.haslayer(TCP):
188 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
189 elif packet.haslayer(UDP):
190 self.assertEqual(packet[UDP].dport, self.udp_port_in)
192 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
194 self.logger.error(ppp("Unexpected or invalid packet "
195 "(inside network):", packet))
198 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
200 Verify captured IPv6 packets on inside network
202 :param capture: Captured packets
203 :param src_ip: Source IP
204 :param dst_ip: Destination IP address
205 :param packet_num: Expected number of packets (Default 3)
207 self.assertEqual(packet_num, len(capture))
208 for packet in capture:
210 self.assertEqual(packet[IPv6].src, src_ip)
211 self.assertEqual(packet[IPv6].dst, dst_ip)
212 if packet.haslayer(TCP):
213 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
214 elif packet.haslayer(UDP):
215 self.assertEqual(packet[UDP].dport, self.udp_port_in)
217 self.assertEqual(packet[ICMPv6EchoReply].id,
220 self.logger.error(ppp("Unexpected or invalid packet "
221 "(inside network):", packet))
224 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
226 Verify captured packet that don't have to be translated
228 :param capture: Captured packets
229 :param ingress_if: Ingress interface
230 :param egress_if: Egress interface
232 for packet in capture:
234 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
235 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
236 if packet.haslayer(TCP):
237 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
238 elif packet.haslayer(UDP):
239 self.assertEqual(packet[UDP].sport, self.udp_port_in)
241 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
243 self.logger.error(ppp("Unexpected or invalid packet "
244 "(inside network):", packet))
247 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
248 packet_num=3, icmp_type=11):
250 Verify captured packets with ICMP errors on outside network
252 :param capture: Captured packets
253 :param src_ip: Translated IP address or IP address of VPP
254 (Default use global SNAT address)
255 :param packet_num: Expected number of packets (Default 3)
256 :param icmp_type: Type of error ICMP packet
257 we are expecting (Default 11)
260 src_ip = self.snat_addr
261 self.assertEqual(packet_num, len(capture))
262 for packet in capture:
264 self.assertEqual(packet[IP].src, src_ip)
265 self.assertTrue(packet.haslayer(ICMP))
267 self.assertEqual(icmp.type, icmp_type)
268 self.assertTrue(icmp.haslayer(IPerror))
269 inner_ip = icmp[IPerror]
270 if inner_ip.haslayer(TCPerror):
271 self.assertEqual(inner_ip[TCPerror].dport,
273 elif inner_ip.haslayer(UDPerror):
274 self.assertEqual(inner_ip[UDPerror].dport,
277 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
279 self.logger.error(ppp("Unexpected or invalid packet "
280 "(outside network):", packet))
283 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
286 Verify captured packets with ICMP errors on inside network
288 :param capture: Captured packets
289 :param in_if: Inside interface
290 :param packet_num: Expected number of packets (Default 3)
291 :param icmp_type: Type of error ICMP packet
292 we are expecting (Default 11)
294 self.assertEqual(packet_num, len(capture))
295 for packet in capture:
297 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
298 self.assertTrue(packet.haslayer(ICMP))
300 self.assertEqual(icmp.type, icmp_type)
301 self.assertTrue(icmp.haslayer(IPerror))
302 inner_ip = icmp[IPerror]
303 if inner_ip.haslayer(TCPerror):
304 self.assertEqual(inner_ip[TCPerror].sport,
306 elif inner_ip.haslayer(UDPerror):
307 self.assertEqual(inner_ip[UDPerror].sport,
310 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
312 self.logger.error(ppp("Unexpected or invalid packet "
313 "(inside network):", packet))
316 def verify_ipfix_nat44_ses(self, data):
318 Verify IPFIX NAT44 session create/delete event
320 :param data: Decoded IPFIX data records
322 nat44_ses_create_num = 0
323 nat44_ses_delete_num = 0
324 self.assertEqual(6, len(data))
327 self.assertIn(ord(record[230]), [4, 5])
328 if ord(record[230]) == 4:
329 nat44_ses_create_num += 1
331 nat44_ses_delete_num += 1
333 self.assertEqual(self.pg0.remote_ip4n, record[8])
334 # postNATSourceIPv4Address
335 self.assertEqual(socket.inet_pton(socket.AF_INET, self.snat_addr),
338 self.assertEqual(struct.pack("!I", 0), record[234])
339 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
340 if IP_PROTOS.icmp == ord(record[4]):
341 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
342 self.assertEqual(struct.pack("!H", self.icmp_id_out),
344 elif IP_PROTOS.tcp == ord(record[4]):
345 self.assertEqual(struct.pack("!H", self.tcp_port_in),
347 self.assertEqual(struct.pack("!H", self.tcp_port_out),
349 elif IP_PROTOS.udp == ord(record[4]):
350 self.assertEqual(struct.pack("!H", self.udp_port_in),
352 self.assertEqual(struct.pack("!H", self.udp_port_out),
355 self.fail("Invalid protocol")
356 self.assertEqual(3, nat44_ses_create_num)
357 self.assertEqual(3, nat44_ses_delete_num)
359 def verify_ipfix_addr_exhausted(self, data):
361 Verify IPFIX NAT addresses event
363 :param data: Decoded IPFIX data records
365 self.assertEqual(1, len(data))
368 self.assertEqual(ord(record[230]), 3)
370 self.assertEqual(struct.pack("!I", 0), record[283])
373 class TestSNAT(MethodHolder):
374 """ SNAT Test Cases """
378 super(TestSNAT, cls).setUpClass()
381 cls.tcp_port_in = 6303
382 cls.tcp_port_out = 6303
383 cls.udp_port_in = 6304
384 cls.udp_port_out = 6304
385 cls.icmp_id_in = 6305
386 cls.icmp_id_out = 6305
387 cls.snat_addr = '10.0.0.3'
388 cls.ipfix_src_port = 4739
389 cls.ipfix_domain_id = 1
391 cls.create_pg_interfaces(range(9))
392 cls.interfaces = list(cls.pg_interfaces[0:4])
394 for i in cls.interfaces:
399 cls.pg0.generate_remote_hosts(3)
400 cls.pg0.configure_ipv4_neighbors()
402 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
404 cls.pg4._local_ip4 = "172.16.255.1"
405 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
406 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
407 cls.pg4.set_table_ip4(10)
408 cls.pg5._local_ip4 = "172.16.255.3"
409 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
410 cls.pg5._remote_hosts[0]._ip4 = "172.16.255.4"
411 cls.pg5.set_table_ip4(10)
412 cls.pg6._local_ip4 = "172.16.255.1"
413 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
414 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
415 cls.pg6.set_table_ip4(20)
416 for i in cls.overlapping_interfaces:
425 super(TestSNAT, cls).tearDownClass()
428 def clear_snat(self):
430 Clear SNAT configuration.
432 # I found no elegant way to do this
433 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
434 dst_address_length=32,
435 next_hop_address=self.pg7.remote_ip4n,
436 next_hop_sw_if_index=self.pg7.sw_if_index,
438 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
439 dst_address_length=32,
440 next_hop_address=self.pg8.remote_ip4n,
441 next_hop_sw_if_index=self.pg8.sw_if_index,
444 for intf in [self.pg7, self.pg8]:
445 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
447 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
452 if self.pg7.has_ip4_config:
453 self.pg7.unconfig_ip4()
455 interfaces = self.vapi.snat_interface_addr_dump()
456 for intf in interfaces:
457 self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
459 self.vapi.snat_ipfix(enable=0, src_port=self.ipfix_src_port,
460 domain_id=self.ipfix_domain_id)
461 self.ipfix_src_port = 4739
462 self.ipfix_domain_id = 1
464 interfaces = self.vapi.snat_interface_dump()
465 for intf in interfaces:
466 self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
470 static_mappings = self.vapi.snat_static_mapping_dump()
471 for sm in static_mappings:
472 self.vapi.snat_add_static_mapping(sm.local_ip_address,
473 sm.external_ip_address,
474 local_port=sm.local_port,
475 external_port=sm.external_port,
476 addr_only=sm.addr_only,
478 protocol=sm.protocol,
481 adresses = self.vapi.snat_address_dump()
482 for addr in adresses:
483 self.vapi.snat_add_address_range(addr.ip_address,
487 def snat_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
488 local_port=0, external_port=0, vrf_id=0,
489 is_add=1, external_sw_if_index=0xFFFFFFFF,
492 Add/delete S-NAT static mapping
494 :param local_ip: Local IP address
495 :param external_ip: External IP address
496 :param local_port: Local port number (Optional)
497 :param external_port: External port number (Optional)
498 :param vrf_id: VRF ID (Default 0)
499 :param is_add: 1 if add, 0 if delete (Default add)
500 :param external_sw_if_index: External interface instead of IP address
501 :param proto: IP protocol (Mandatory if port specified)
504 if local_port and external_port:
506 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
507 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
508 self.vapi.snat_add_static_mapping(
511 external_sw_if_index,
519 def snat_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
521 Add/delete S-NAT address
523 :param ip: IP address
524 :param is_add: 1 if add, 0 if delete (Default add)
526 snat_addr = socket.inet_pton(socket.AF_INET, ip)
527 self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add,
530 def test_dynamic(self):
531 """ SNAT dynamic translation test """
533 self.snat_add_address(self.snat_addr)
534 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
535 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
539 pkts = self.create_stream_in(self.pg0, self.pg1)
540 self.pg0.add_stream(pkts)
541 self.pg_enable_capture(self.pg_interfaces)
543 capture = self.pg1.get_capture(len(pkts))
544 self.verify_capture_out(capture)
547 pkts = self.create_stream_out(self.pg1)
548 self.pg1.add_stream(pkts)
549 self.pg_enable_capture(self.pg_interfaces)
551 capture = self.pg0.get_capture(len(pkts))
552 self.verify_capture_in(capture, self.pg0)
554 def test_dynamic_icmp_errors_in2out_ttl_1(self):
555 """ SNAT handling of client packets with TTL=1 """
557 self.snat_add_address(self.snat_addr)
558 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
559 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
562 # Client side - generate traffic
563 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
564 self.pg0.add_stream(pkts)
565 self.pg_enable_capture(self.pg_interfaces)
568 # Client side - verify ICMP type 11 packets
569 capture = self.pg0.get_capture(len(pkts))
570 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
572 def test_dynamic_icmp_errors_out2in_ttl_1(self):
573 """ SNAT handling of server packets with TTL=1 """
575 self.snat_add_address(self.snat_addr)
576 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
577 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
580 # Client side - create sessions
581 pkts = self.create_stream_in(self.pg0, self.pg1)
582 self.pg0.add_stream(pkts)
583 self.pg_enable_capture(self.pg_interfaces)
586 # Server side - generate traffic
587 capture = self.pg1.get_capture(len(pkts))
588 self.verify_capture_out(capture)
589 pkts = self.create_stream_out(self.pg1, ttl=1)
590 self.pg1.add_stream(pkts)
591 self.pg_enable_capture(self.pg_interfaces)
594 # Server side - verify ICMP type 11 packets
595 capture = self.pg1.get_capture(len(pkts))
596 self.verify_capture_out_with_icmp_errors(capture,
597 src_ip=self.pg1.local_ip4)
599 def test_dynamic_icmp_errors_in2out_ttl_2(self):
600 """ SNAT handling of error responses to client packets with TTL=2 """
602 self.snat_add_address(self.snat_addr)
603 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
604 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
607 # Client side - generate traffic
608 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
609 self.pg0.add_stream(pkts)
610 self.pg_enable_capture(self.pg_interfaces)
613 # Server side - simulate ICMP type 11 response
614 capture = self.pg1.get_capture(len(pkts))
615 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
616 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
617 ICMP(type=11) / packet[IP] for packet in capture]
618 self.pg1.add_stream(pkts)
619 self.pg_enable_capture(self.pg_interfaces)
622 # Client side - verify ICMP type 11 packets
623 capture = self.pg0.get_capture(len(pkts))
624 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
626 def test_dynamic_icmp_errors_out2in_ttl_2(self):
627 """ SNAT handling of error responses to server packets with TTL=2 """
629 self.snat_add_address(self.snat_addr)
630 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
631 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
634 # Client side - create sessions
635 pkts = self.create_stream_in(self.pg0, self.pg1)
636 self.pg0.add_stream(pkts)
637 self.pg_enable_capture(self.pg_interfaces)
640 # Server side - generate traffic
641 capture = self.pg1.get_capture(len(pkts))
642 self.verify_capture_out(capture)
643 pkts = self.create_stream_out(self.pg1, ttl=2)
644 self.pg1.add_stream(pkts)
645 self.pg_enable_capture(self.pg_interfaces)
648 # Client side - simulate ICMP type 11 response
649 capture = self.pg0.get_capture(len(pkts))
650 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
651 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
652 ICMP(type=11) / packet[IP] for packet in capture]
653 self.pg0.add_stream(pkts)
654 self.pg_enable_capture(self.pg_interfaces)
657 # Server side - verify ICMP type 11 packets
658 capture = self.pg1.get_capture(len(pkts))
659 self.verify_capture_out_with_icmp_errors(capture)
661 def test_ping_out_interface_from_outside(self):
662 """ Ping SNAT out interface from outside network """
664 self.snat_add_address(self.snat_addr)
665 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
666 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
669 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
670 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
671 ICMP(id=self.icmp_id_out, type='echo-request'))
673 self.pg1.add_stream(pkts)
674 self.pg_enable_capture(self.pg_interfaces)
676 capture = self.pg1.get_capture(len(pkts))
677 self.assertEqual(1, len(capture))
680 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
681 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
682 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
683 self.assertEqual(packet[ICMP].type, 0) # echo reply
685 self.logger.error(ppp("Unexpected or invalid packet "
686 "(outside network):", packet))
689 def test_ping_internal_host_from_outside(self):
690 """ Ping internal host from outside network """
692 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr)
693 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
694 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
698 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
699 IP(src=self.pg1.remote_ip4, dst=self.snat_addr, ttl=64) /
700 ICMP(id=self.icmp_id_out, type='echo-request'))
701 self.pg1.add_stream(pkt)
702 self.pg_enable_capture(self.pg_interfaces)
704 capture = self.pg0.get_capture(1)
705 self.verify_capture_in(capture, self.pg0, packet_num=1)
706 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
709 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
710 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
711 ICMP(id=self.icmp_id_in, type='echo-reply'))
712 self.pg0.add_stream(pkt)
713 self.pg_enable_capture(self.pg_interfaces)
715 capture = self.pg1.get_capture(1)
716 self.verify_capture_out(capture, same_port=True, packet_num=1)
717 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
719 def test_static_in(self):
720 """ SNAT 1:1 NAT initialized from inside network """
723 self.tcp_port_out = 6303
724 self.udp_port_out = 6304
725 self.icmp_id_out = 6305
727 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
728 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
729 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
733 pkts = self.create_stream_in(self.pg0, self.pg1)
734 self.pg0.add_stream(pkts)
735 self.pg_enable_capture(self.pg_interfaces)
737 capture = self.pg1.get_capture(len(pkts))
738 self.verify_capture_out(capture, nat_ip, True)
741 pkts = self.create_stream_out(self.pg1, nat_ip)
742 self.pg1.add_stream(pkts)
743 self.pg_enable_capture(self.pg_interfaces)
745 capture = self.pg0.get_capture(len(pkts))
746 self.verify_capture_in(capture, self.pg0)
748 def test_static_out(self):
749 """ SNAT 1:1 NAT initialized from outside network """
752 self.tcp_port_out = 6303
753 self.udp_port_out = 6304
754 self.icmp_id_out = 6305
756 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
757 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
758 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
762 pkts = self.create_stream_out(self.pg1, nat_ip)
763 self.pg1.add_stream(pkts)
764 self.pg_enable_capture(self.pg_interfaces)
766 capture = self.pg0.get_capture(len(pkts))
767 self.verify_capture_in(capture, self.pg0)
770 pkts = self.create_stream_in(self.pg0, self.pg1)
771 self.pg0.add_stream(pkts)
772 self.pg_enable_capture(self.pg_interfaces)
774 capture = self.pg1.get_capture(len(pkts))
775 self.verify_capture_out(capture, nat_ip, True)
777 def test_static_with_port_in(self):
778 """ SNAT 1:1 NAT with port initialized from inside network """
780 self.tcp_port_out = 3606
781 self.udp_port_out = 3607
782 self.icmp_id_out = 3608
784 self.snat_add_address(self.snat_addr)
785 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
786 self.tcp_port_in, self.tcp_port_out,
788 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
789 self.udp_port_in, self.udp_port_out,
791 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
792 self.icmp_id_in, self.icmp_id_out,
793 proto=IP_PROTOS.icmp)
794 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
795 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
799 pkts = self.create_stream_in(self.pg0, self.pg1)
800 self.pg0.add_stream(pkts)
801 self.pg_enable_capture(self.pg_interfaces)
803 capture = self.pg1.get_capture(len(pkts))
804 self.verify_capture_out(capture)
807 pkts = self.create_stream_out(self.pg1)
808 self.pg1.add_stream(pkts)
809 self.pg_enable_capture(self.pg_interfaces)
811 capture = self.pg0.get_capture(len(pkts))
812 self.verify_capture_in(capture, self.pg0)
814 def test_static_with_port_out(self):
815 """ SNAT 1:1 NAT with port initialized from outside network """
817 self.tcp_port_out = 30606
818 self.udp_port_out = 30607
819 self.icmp_id_out = 30608
821 self.snat_add_address(self.snat_addr)
822 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
823 self.tcp_port_in, self.tcp_port_out,
825 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
826 self.udp_port_in, self.udp_port_out,
828 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
829 self.icmp_id_in, self.icmp_id_out,
830 proto=IP_PROTOS.icmp)
831 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
832 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
836 pkts = self.create_stream_out(self.pg1)
837 self.pg1.add_stream(pkts)
838 self.pg_enable_capture(self.pg_interfaces)
840 capture = self.pg0.get_capture(len(pkts))
841 self.verify_capture_in(capture, self.pg0)
844 pkts = self.create_stream_in(self.pg0, self.pg1)
845 self.pg0.add_stream(pkts)
846 self.pg_enable_capture(self.pg_interfaces)
848 capture = self.pg1.get_capture(len(pkts))
849 self.verify_capture_out(capture)
851 def test_static_vrf_aware(self):
852 """ SNAT 1:1 NAT VRF awareness """
854 nat_ip1 = "10.0.0.30"
855 nat_ip2 = "10.0.0.40"
856 self.tcp_port_out = 6303
857 self.udp_port_out = 6304
858 self.icmp_id_out = 6305
860 self.snat_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
862 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
864 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
866 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
867 self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
869 # inside interface VRF match SNAT static mapping VRF
870 pkts = self.create_stream_in(self.pg4, self.pg3)
871 self.pg4.add_stream(pkts)
872 self.pg_enable_capture(self.pg_interfaces)
874 capture = self.pg3.get_capture(len(pkts))
875 self.verify_capture_out(capture, nat_ip1, True)
877 # inside interface VRF don't match SNAT static mapping VRF (packets
879 pkts = self.create_stream_in(self.pg0, self.pg3)
880 self.pg0.add_stream(pkts)
881 self.pg_enable_capture(self.pg_interfaces)
883 self.pg3.assert_nothing_captured()
885 def test_multiple_inside_interfaces(self):
886 """ SNAT multiple inside interfaces (non-overlapping address space) """
888 self.snat_add_address(self.snat_addr)
889 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
890 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
891 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
894 # between two S-NAT inside interfaces (no translation)
895 pkts = self.create_stream_in(self.pg0, self.pg1)
896 self.pg0.add_stream(pkts)
897 self.pg_enable_capture(self.pg_interfaces)
899 capture = self.pg1.get_capture(len(pkts))
900 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
902 # from S-NAT inside to interface without S-NAT feature (no translation)
903 pkts = self.create_stream_in(self.pg0, self.pg2)
904 self.pg0.add_stream(pkts)
905 self.pg_enable_capture(self.pg_interfaces)
907 capture = self.pg2.get_capture(len(pkts))
908 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
910 # in2out 1st interface
911 pkts = self.create_stream_in(self.pg0, self.pg3)
912 self.pg0.add_stream(pkts)
913 self.pg_enable_capture(self.pg_interfaces)
915 capture = self.pg3.get_capture(len(pkts))
916 self.verify_capture_out(capture)
918 # out2in 1st interface
919 pkts = self.create_stream_out(self.pg3)
920 self.pg3.add_stream(pkts)
921 self.pg_enable_capture(self.pg_interfaces)
923 capture = self.pg0.get_capture(len(pkts))
924 self.verify_capture_in(capture, self.pg0)
926 # in2out 2nd interface
927 pkts = self.create_stream_in(self.pg1, self.pg3)
928 self.pg1.add_stream(pkts)
929 self.pg_enable_capture(self.pg_interfaces)
931 capture = self.pg3.get_capture(len(pkts))
932 self.verify_capture_out(capture)
934 # out2in 2nd interface
935 pkts = self.create_stream_out(self.pg3)
936 self.pg3.add_stream(pkts)
937 self.pg_enable_capture(self.pg_interfaces)
939 capture = self.pg1.get_capture(len(pkts))
940 self.verify_capture_in(capture, self.pg1)
942 def test_inside_overlapping_interfaces(self):
943 """ SNAT multiple inside interfaces with overlapping address space """
945 static_nat_ip = "10.0.0.10"
946 self.snat_add_address(self.snat_addr)
947 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
949 self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
950 self.vapi.snat_interface_add_del_feature(self.pg5.sw_if_index)
951 self.vapi.snat_interface_add_del_feature(self.pg6.sw_if_index)
952 self.snat_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
955 # between S-NAT inside interfaces with same VRF (no translation)
956 pkts = self.create_stream_in(self.pg4, self.pg5)
957 self.pg4.add_stream(pkts)
958 self.pg_enable_capture(self.pg_interfaces)
960 capture = self.pg5.get_capture(len(pkts))
961 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
963 # between S-NAT inside interfaces with different VRF (hairpinning)
964 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
965 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
966 TCP(sport=1234, dport=5678))
967 self.pg4.add_stream(p)
968 self.pg_enable_capture(self.pg_interfaces)
970 capture = self.pg6.get_capture(1)
975 self.assertEqual(ip.src, self.snat_addr)
976 self.assertEqual(ip.dst, self.pg6.remote_ip4)
977 self.assertNotEqual(tcp.sport, 1234)
978 self.assertEqual(tcp.dport, 5678)
980 self.logger.error(ppp("Unexpected or invalid packet:", p))
983 # in2out 1st interface
984 pkts = self.create_stream_in(self.pg4, self.pg3)
985 self.pg4.add_stream(pkts)
986 self.pg_enable_capture(self.pg_interfaces)
988 capture = self.pg3.get_capture(len(pkts))
989 self.verify_capture_out(capture)
991 # out2in 1st interface
992 pkts = self.create_stream_out(self.pg3)
993 self.pg3.add_stream(pkts)
994 self.pg_enable_capture(self.pg_interfaces)
996 capture = self.pg4.get_capture(len(pkts))
997 self.verify_capture_in(capture, self.pg4)
999 # in2out 2nd interface
1000 pkts = self.create_stream_in(self.pg5, self.pg3)
1001 self.pg5.add_stream(pkts)
1002 self.pg_enable_capture(self.pg_interfaces)
1004 capture = self.pg3.get_capture(len(pkts))
1005 self.verify_capture_out(capture)
1007 # out2in 2nd interface
1008 pkts = self.create_stream_out(self.pg3)
1009 self.pg3.add_stream(pkts)
1010 self.pg_enable_capture(self.pg_interfaces)
1012 capture = self.pg5.get_capture(len(pkts))
1013 self.verify_capture_in(capture, self.pg5)
1016 addresses = self.vapi.snat_address_dump()
1017 self.assertEqual(len(addresses), 1)
1018 sessions = self.vapi.snat_user_session_dump(self.pg5.remote_ip4n, 10)
1019 self.assertEqual(len(sessions), 3)
1020 for session in sessions:
1021 self.assertFalse(session.is_static)
1022 self.assertEqual(session.inside_ip_address[0:4],
1023 self.pg5.remote_ip4n)
1024 self.assertEqual(session.outside_ip_address,
1025 addresses[0].ip_address)
1026 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1027 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1028 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1029 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1030 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1031 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1032 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1033 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1034 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1036 # in2out 3rd interface
1037 pkts = self.create_stream_in(self.pg6, self.pg3)
1038 self.pg6.add_stream(pkts)
1039 self.pg_enable_capture(self.pg_interfaces)
1041 capture = self.pg3.get_capture(len(pkts))
1042 self.verify_capture_out(capture, static_nat_ip, True)
1044 # out2in 3rd interface
1045 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1046 self.pg3.add_stream(pkts)
1047 self.pg_enable_capture(self.pg_interfaces)
1049 capture = self.pg6.get_capture(len(pkts))
1050 self.verify_capture_in(capture, self.pg6)
1052 # general user and session dump verifications
1053 users = self.vapi.snat_user_dump()
1054 self.assertTrue(len(users) >= 3)
1055 addresses = self.vapi.snat_address_dump()
1056 self.assertEqual(len(addresses), 1)
1058 sessions = self.vapi.snat_user_session_dump(user.ip_address,
1060 for session in sessions:
1061 self.assertEqual(user.ip_address, session.inside_ip_address)
1062 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1063 self.assertTrue(session.protocol in
1064 [IP_PROTOS.tcp, IP_PROTOS.udp,
1068 sessions = self.vapi.snat_user_session_dump(self.pg4.remote_ip4n, 10)
1069 self.assertTrue(len(sessions) >= 4)
1070 for session in sessions:
1071 self.assertFalse(session.is_static)
1072 self.assertEqual(session.inside_ip_address[0:4],
1073 self.pg4.remote_ip4n)
1074 self.assertEqual(session.outside_ip_address,
1075 addresses[0].ip_address)
1078 sessions = self.vapi.snat_user_session_dump(self.pg6.remote_ip4n, 20)
1079 self.assertTrue(len(sessions) >= 3)
1080 for session in sessions:
1081 self.assertTrue(session.is_static)
1082 self.assertEqual(session.inside_ip_address[0:4],
1083 self.pg6.remote_ip4n)
1084 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1085 map(int, static_nat_ip.split('.')))
1086 self.assertTrue(session.inside_port in
1087 [self.tcp_port_in, self.udp_port_in,
1090 def test_hairpinning(self):
1091 """ SNAT hairpinning - 1:1 NAT with port"""
1093 host = self.pg0.remote_hosts[0]
1094 server = self.pg0.remote_hosts[1]
1097 server_in_port = 5678
1098 server_out_port = 8765
1100 self.snat_add_address(self.snat_addr)
1101 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1102 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1104 # add static mapping for server
1105 self.snat_add_static_mapping(server.ip4, self.snat_addr,
1106 server_in_port, server_out_port,
1107 proto=IP_PROTOS.tcp)
1109 # send packet from host to server
1110 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1111 IP(src=host.ip4, dst=self.snat_addr) /
1112 TCP(sport=host_in_port, dport=server_out_port))
1113 self.pg0.add_stream(p)
1114 self.pg_enable_capture(self.pg_interfaces)
1116 capture = self.pg0.get_capture(1)
1121 self.assertEqual(ip.src, self.snat_addr)
1122 self.assertEqual(ip.dst, server.ip4)
1123 self.assertNotEqual(tcp.sport, host_in_port)
1124 self.assertEqual(tcp.dport, server_in_port)
1125 self.check_tcp_checksum(p)
1126 host_out_port = tcp.sport
1128 self.logger.error(ppp("Unexpected or invalid packet:", p))
1131 # send reply from server to host
1132 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1133 IP(src=server.ip4, dst=self.snat_addr) /
1134 TCP(sport=server_in_port, dport=host_out_port))
1135 self.pg0.add_stream(p)
1136 self.pg_enable_capture(self.pg_interfaces)
1138 capture = self.pg0.get_capture(1)
1143 self.assertEqual(ip.src, self.snat_addr)
1144 self.assertEqual(ip.dst, host.ip4)
1145 self.assertEqual(tcp.sport, server_out_port)
1146 self.assertEqual(tcp.dport, host_in_port)
1147 self.check_tcp_checksum(p)
1149 self.logger.error(ppp("Unexpected or invalid packet:"), p)
1152 def test_hairpinning2(self):
1153 """ SNAT hairpinning - 1:1 NAT"""
1155 server1_nat_ip = "10.0.0.10"
1156 server2_nat_ip = "10.0.0.11"
1157 host = self.pg0.remote_hosts[0]
1158 server1 = self.pg0.remote_hosts[1]
1159 server2 = self.pg0.remote_hosts[2]
1160 server_tcp_port = 22
1161 server_udp_port = 20
1163 self.snat_add_address(self.snat_addr)
1164 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1165 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1168 # add static mapping for servers
1169 self.snat_add_static_mapping(server1.ip4, server1_nat_ip)
1170 self.snat_add_static_mapping(server2.ip4, server2_nat_ip)
1174 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1175 IP(src=host.ip4, dst=server1_nat_ip) /
1176 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1178 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1179 IP(src=host.ip4, dst=server1_nat_ip) /
1180 UDP(sport=self.udp_port_in, dport=server_udp_port))
1182 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1183 IP(src=host.ip4, dst=server1_nat_ip) /
1184 ICMP(id=self.icmp_id_in, type='echo-request'))
1186 self.pg0.add_stream(pkts)
1187 self.pg_enable_capture(self.pg_interfaces)
1189 capture = self.pg0.get_capture(len(pkts))
1190 for packet in capture:
1192 self.assertEqual(packet[IP].src, self.snat_addr)
1193 self.assertEqual(packet[IP].dst, server1.ip4)
1194 if packet.haslayer(TCP):
1195 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1196 self.assertEqual(packet[TCP].dport, server_tcp_port)
1197 self.tcp_port_out = packet[TCP].sport
1198 self.check_tcp_checksum(packet)
1199 elif packet.haslayer(UDP):
1200 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1201 self.assertEqual(packet[UDP].dport, server_udp_port)
1202 self.udp_port_out = packet[UDP].sport
1204 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1205 self.icmp_id_out = packet[ICMP].id
1207 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1212 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1213 IP(src=server1.ip4, dst=self.snat_addr) /
1214 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1216 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1217 IP(src=server1.ip4, dst=self.snat_addr) /
1218 UDP(sport=server_udp_port, dport=self.udp_port_out))
1220 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1221 IP(src=server1.ip4, dst=self.snat_addr) /
1222 ICMP(id=self.icmp_id_out, type='echo-reply'))
1224 self.pg0.add_stream(pkts)
1225 self.pg_enable_capture(self.pg_interfaces)
1227 capture = self.pg0.get_capture(len(pkts))
1228 for packet in capture:
1230 self.assertEqual(packet[IP].src, server1_nat_ip)
1231 self.assertEqual(packet[IP].dst, host.ip4)
1232 if packet.haslayer(TCP):
1233 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1234 self.assertEqual(packet[TCP].sport, server_tcp_port)
1235 self.check_tcp_checksum(packet)
1236 elif packet.haslayer(UDP):
1237 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1238 self.assertEqual(packet[UDP].sport, server_udp_port)
1240 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1242 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1245 # server2 to server1
1247 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1248 IP(src=server2.ip4, dst=server1_nat_ip) /
1249 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1251 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1252 IP(src=server2.ip4, dst=server1_nat_ip) /
1253 UDP(sport=self.udp_port_in, dport=server_udp_port))
1255 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1256 IP(src=server2.ip4, dst=server1_nat_ip) /
1257 ICMP(id=self.icmp_id_in, type='echo-request'))
1259 self.pg0.add_stream(pkts)
1260 self.pg_enable_capture(self.pg_interfaces)
1262 capture = self.pg0.get_capture(len(pkts))
1263 for packet in capture:
1265 self.assertEqual(packet[IP].src, server2_nat_ip)
1266 self.assertEqual(packet[IP].dst, server1.ip4)
1267 if packet.haslayer(TCP):
1268 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1269 self.assertEqual(packet[TCP].dport, server_tcp_port)
1270 self.tcp_port_out = packet[TCP].sport
1271 self.check_tcp_checksum(packet)
1272 elif packet.haslayer(UDP):
1273 self.assertEqual(packet[UDP].sport, self.udp_port_in)
1274 self.assertEqual(packet[UDP].dport, server_udp_port)
1275 self.udp_port_out = packet[UDP].sport
1277 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1278 self.icmp_id_out = packet[ICMP].id
1280 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1283 # server1 to server2
1285 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1286 IP(src=server1.ip4, dst=server2_nat_ip) /
1287 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1289 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1290 IP(src=server1.ip4, dst=server2_nat_ip) /
1291 UDP(sport=server_udp_port, dport=self.udp_port_out))
1293 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1294 IP(src=server1.ip4, dst=server2_nat_ip) /
1295 ICMP(id=self.icmp_id_out, type='echo-reply'))
1297 self.pg0.add_stream(pkts)
1298 self.pg_enable_capture(self.pg_interfaces)
1300 capture = self.pg0.get_capture(len(pkts))
1301 for packet in capture:
1303 self.assertEqual(packet[IP].src, server1_nat_ip)
1304 self.assertEqual(packet[IP].dst, server2.ip4)
1305 if packet.haslayer(TCP):
1306 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1307 self.assertEqual(packet[TCP].sport, server_tcp_port)
1308 self.check_tcp_checksum(packet)
1309 elif packet.haslayer(UDP):
1310 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1311 self.assertEqual(packet[UDP].sport, server_udp_port)
1313 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1315 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1318 def test_max_translations_per_user(self):
1319 """ MAX translations per user - recycle the least recently used """
1321 self.snat_add_address(self.snat_addr)
1322 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1323 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1326 # get maximum number of translations per user
1327 snat_config = self.vapi.snat_show_config()
1329 # send more than maximum number of translations per user packets
1330 pkts_num = snat_config.max_translations_per_user + 5
1332 for port in range(0, pkts_num):
1333 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1334 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1335 TCP(sport=1025 + port))
1337 self.pg0.add_stream(pkts)
1338 self.pg_enable_capture(self.pg_interfaces)
1341 # verify number of translated packet
1342 self.pg1.get_capture(pkts_num)
1344 def test_interface_addr(self):
1345 """ Acquire SNAT addresses from interface """
1346 self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1348 # no address in NAT pool
1349 adresses = self.vapi.snat_address_dump()
1350 self.assertEqual(0, len(adresses))
1352 # configure interface address and check NAT address pool
1353 self.pg7.config_ip4()
1354 adresses = self.vapi.snat_address_dump()
1355 self.assertEqual(1, len(adresses))
1356 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1358 # remove interface address and check NAT address pool
1359 self.pg7.unconfig_ip4()
1360 adresses = self.vapi.snat_address_dump()
1361 self.assertEqual(0, len(adresses))
1363 def test_interface_addr_static_mapping(self):
1364 """ Static mapping with addresses from interface """
1365 self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1366 self.snat_add_static_mapping('1.2.3.4',
1367 external_sw_if_index=self.pg7.sw_if_index)
1369 # static mappings with external interface
1370 static_mappings = self.vapi.snat_static_mapping_dump()
1371 self.assertEqual(1, len(static_mappings))
1372 self.assertEqual(self.pg7.sw_if_index,
1373 static_mappings[0].external_sw_if_index)
1375 # configure interface address and check static mappings
1376 self.pg7.config_ip4()
1377 static_mappings = self.vapi.snat_static_mapping_dump()
1378 self.assertEqual(1, len(static_mappings))
1379 self.assertEqual(static_mappings[0].external_ip_address[0:4],
1380 self.pg7.local_ip4n)
1381 self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1383 # remove interface address and check static mappings
1384 self.pg7.unconfig_ip4()
1385 static_mappings = self.vapi.snat_static_mapping_dump()
1386 self.assertEqual(0, len(static_mappings))
1388 def test_ipfix_nat44_sess(self):
1389 """ S-NAT IPFIX logging NAT44 session created/delted """
1390 self.ipfix_domain_id = 10
1391 self.ipfix_src_port = 20202
1392 colector_port = 30303
1393 bind_layers(UDP, IPFIX, dport=30303)
1394 self.snat_add_address(self.snat_addr)
1395 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1396 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1398 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1399 src_address=self.pg3.local_ip4n,
1401 template_interval=10,
1402 collector_port=colector_port)
1403 self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
1404 src_port=self.ipfix_src_port)
1406 pkts = self.create_stream_in(self.pg0, self.pg1)
1407 self.pg0.add_stream(pkts)
1408 self.pg_enable_capture(self.pg_interfaces)
1410 capture = self.pg1.get_capture(len(pkts))
1411 self.verify_capture_out(capture)
1412 self.snat_add_address(self.snat_addr, is_add=0)
1413 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1414 capture = self.pg3.get_capture(3)
1415 ipfix = IPFIXDecoder()
1416 # first load template
1418 self.assertTrue(p.haslayer(IPFIX))
1419 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1420 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1421 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1422 self.assertEqual(p[UDP].dport, colector_port)
1423 self.assertEqual(p[IPFIX].observationDomainID,
1424 self.ipfix_domain_id)
1425 if p.haslayer(Template):
1426 ipfix.add_template(p.getlayer(Template))
1427 # verify events in data set
1429 if p.haslayer(Data):
1430 data = ipfix.decode_data_set(p.getlayer(Set))
1431 self.verify_ipfix_nat44_ses(data)
1433 def test_ipfix_addr_exhausted(self):
1434 """ S-NAT IPFIX logging NAT addresses exhausted """
1435 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1436 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1438 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1439 src_address=self.pg3.local_ip4n,
1441 template_interval=10)
1442 self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
1443 src_port=self.ipfix_src_port)
1445 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1446 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1448 self.pg0.add_stream(p)
1449 self.pg_enable_capture(self.pg_interfaces)
1451 capture = self.pg1.get_capture(0)
1452 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1453 capture = self.pg3.get_capture(3)
1454 ipfix = IPFIXDecoder()
1455 # first load template
1457 self.assertTrue(p.haslayer(IPFIX))
1458 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1459 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1460 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1461 self.assertEqual(p[UDP].dport, 4739)
1462 self.assertEqual(p[IPFIX].observationDomainID,
1463 self.ipfix_domain_id)
1464 if p.haslayer(Template):
1465 ipfix.add_template(p.getlayer(Template))
1466 # verify events in data set
1468 if p.haslayer(Data):
1469 data = ipfix.decode_data_set(p.getlayer(Set))
1470 self.verify_ipfix_addr_exhausted(data)
1472 def test_pool_addr_fib(self):
1473 """ S-NAT add pool addresses to FIB """
1474 static_addr = '10.0.0.10'
1475 self.snat_add_address(self.snat_addr)
1476 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1477 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1479 self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr)
1482 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1483 ARP(op=ARP.who_has, pdst=self.snat_addr,
1484 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1485 self.pg1.add_stream(p)
1486 self.pg_enable_capture(self.pg_interfaces)
1488 capture = self.pg1.get_capture(1)
1489 self.assertTrue(capture[0].haslayer(ARP))
1490 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1493 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1494 ARP(op=ARP.who_has, pdst=static_addr,
1495 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1496 self.pg1.add_stream(p)
1497 self.pg_enable_capture(self.pg_interfaces)
1499 capture = self.pg1.get_capture(1)
1500 self.assertTrue(capture[0].haslayer(ARP))
1501 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1503 # send ARP to non-SNAT interface
1504 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1505 ARP(op=ARP.who_has, pdst=self.snat_addr,
1506 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1507 self.pg2.add_stream(p)
1508 self.pg_enable_capture(self.pg_interfaces)
1510 capture = self.pg1.get_capture(0)
1512 # remove addresses and verify
1513 self.snat_add_address(self.snat_addr, is_add=0)
1514 self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr,
1517 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1518 ARP(op=ARP.who_has, pdst=self.snat_addr,
1519 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1520 self.pg1.add_stream(p)
1521 self.pg_enable_capture(self.pg_interfaces)
1523 capture = self.pg1.get_capture(0)
1525 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1526 ARP(op=ARP.who_has, pdst=static_addr,
1527 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1528 self.pg1.add_stream(p)
1529 self.pg_enable_capture(self.pg_interfaces)
1531 capture = self.pg1.get_capture(0)
1533 def test_vrf_mode(self):
1534 """ S-NAT tenant VRF aware address pool mode """
1538 nat_ip1 = "10.0.0.10"
1539 nat_ip2 = "10.0.0.11"
1541 self.pg0.unconfig_ip4()
1542 self.pg1.unconfig_ip4()
1543 self.pg0.set_table_ip4(vrf_id1)
1544 self.pg1.set_table_ip4(vrf_id2)
1545 self.pg0.config_ip4()
1546 self.pg1.config_ip4()
1548 self.snat_add_address(nat_ip1, vrf_id=vrf_id1)
1549 self.snat_add_address(nat_ip2, vrf_id=vrf_id2)
1550 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1551 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1552 self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1556 pkts = self.create_stream_in(self.pg0, self.pg2)
1557 self.pg0.add_stream(pkts)
1558 self.pg_enable_capture(self.pg_interfaces)
1560 capture = self.pg2.get_capture(len(pkts))
1561 self.verify_capture_out(capture, nat_ip1)
1564 pkts = self.create_stream_in(self.pg1, self.pg2)
1565 self.pg1.add_stream(pkts)
1566 self.pg_enable_capture(self.pg_interfaces)
1568 capture = self.pg2.get_capture(len(pkts))
1569 self.verify_capture_out(capture, nat_ip2)
1571 def test_vrf_feature_independent(self):
1572 """ S-NAT tenant VRF independent address pool mode """
1574 nat_ip1 = "10.0.0.10"
1575 nat_ip2 = "10.0.0.11"
1577 self.snat_add_address(nat_ip1)
1578 self.snat_add_address(nat_ip2)
1579 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1580 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1581 self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1585 pkts = self.create_stream_in(self.pg0, self.pg2)
1586 self.pg0.add_stream(pkts)
1587 self.pg_enable_capture(self.pg_interfaces)
1589 capture = self.pg2.get_capture(len(pkts))
1590 self.verify_capture_out(capture, nat_ip1)
1593 pkts = self.create_stream_in(self.pg1, self.pg2)
1594 self.pg1.add_stream(pkts)
1595 self.pg_enable_capture(self.pg_interfaces)
1597 capture = self.pg2.get_capture(len(pkts))
1598 self.verify_capture_out(capture, nat_ip1)
1600 def test_dynamic_ipless_interfaces(self):
1601 """ SNAT interfaces without configured ip dynamic map """
1603 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1604 self.pg7.remote_mac,
1605 self.pg7.remote_ip4n,
1607 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1608 self.pg8.remote_mac,
1609 self.pg8.remote_ip4n,
1612 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1613 dst_address_length=32,
1614 next_hop_address=self.pg7.remote_ip4n,
1615 next_hop_sw_if_index=self.pg7.sw_if_index)
1616 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1617 dst_address_length=32,
1618 next_hop_address=self.pg8.remote_ip4n,
1619 next_hop_sw_if_index=self.pg8.sw_if_index)
1621 self.snat_add_address(self.snat_addr)
1622 self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1623 self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1627 pkts = self.create_stream_in(self.pg7, self.pg8)
1628 self.pg7.add_stream(pkts)
1629 self.pg_enable_capture(self.pg_interfaces)
1631 capture = self.pg8.get_capture(len(pkts))
1632 self.verify_capture_out(capture)
1635 pkts = self.create_stream_out(self.pg8, self.snat_addr)
1636 self.pg8.add_stream(pkts)
1637 self.pg_enable_capture(self.pg_interfaces)
1639 capture = self.pg7.get_capture(len(pkts))
1640 self.verify_capture_in(capture, self.pg7)
1642 def test_static_ipless_interfaces(self):
1643 """ SNAT 1:1 NAT interfaces without configured ip """
1645 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1646 self.pg7.remote_mac,
1647 self.pg7.remote_ip4n,
1649 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1650 self.pg8.remote_mac,
1651 self.pg8.remote_ip4n,
1654 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1655 dst_address_length=32,
1656 next_hop_address=self.pg7.remote_ip4n,
1657 next_hop_sw_if_index=self.pg7.sw_if_index)
1658 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1659 dst_address_length=32,
1660 next_hop_address=self.pg8.remote_ip4n,
1661 next_hop_sw_if_index=self.pg8.sw_if_index)
1663 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr)
1664 self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1665 self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1669 pkts = self.create_stream_out(self.pg8)
1670 self.pg8.add_stream(pkts)
1671 self.pg_enable_capture(self.pg_interfaces)
1673 capture = self.pg7.get_capture(len(pkts))
1674 self.verify_capture_in(capture, self.pg7)
1677 pkts = self.create_stream_in(self.pg7, self.pg8)
1678 self.pg7.add_stream(pkts)
1679 self.pg_enable_capture(self.pg_interfaces)
1681 capture = self.pg8.get_capture(len(pkts))
1682 self.verify_capture_out(capture, self.snat_addr, True)
1684 def test_static_with_port_ipless_interfaces(self):
1685 """ SNAT 1:1 NAT with port interfaces without configured ip """
1687 self.tcp_port_out = 30606
1688 self.udp_port_out = 30607
1689 self.icmp_id_out = 30608
1691 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1692 self.pg7.remote_mac,
1693 self.pg7.remote_ip4n,
1695 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1696 self.pg8.remote_mac,
1697 self.pg8.remote_ip4n,
1700 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1701 dst_address_length=32,
1702 next_hop_address=self.pg7.remote_ip4n,
1703 next_hop_sw_if_index=self.pg7.sw_if_index)
1704 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1705 dst_address_length=32,
1706 next_hop_address=self.pg8.remote_ip4n,
1707 next_hop_sw_if_index=self.pg8.sw_if_index)
1709 self.snat_add_address(self.snat_addr)
1710 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1711 self.tcp_port_in, self.tcp_port_out,
1712 proto=IP_PROTOS.tcp)
1713 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1714 self.udp_port_in, self.udp_port_out,
1715 proto=IP_PROTOS.udp)
1716 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1717 self.icmp_id_in, self.icmp_id_out,
1718 proto=IP_PROTOS.icmp)
1719 self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1720 self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1724 pkts = self.create_stream_out(self.pg8)
1725 self.pg8.add_stream(pkts)
1726 self.pg_enable_capture(self.pg_interfaces)
1728 capture = self.pg7.get_capture(len(pkts))
1729 self.verify_capture_in(capture, self.pg7)
1732 pkts = self.create_stream_in(self.pg7, self.pg8)
1733 self.pg7.add_stream(pkts)
1734 self.pg_enable_capture(self.pg_interfaces)
1736 capture = self.pg8.get_capture(len(pkts))
1737 self.verify_capture_out(capture)
1740 super(TestSNAT, self).tearDown()
1741 if not self.vpp_dead:
1742 self.logger.info(self.vapi.cli("show snat verbose"))
1746 class TestDeterministicNAT(MethodHolder):
1747 """ Deterministic NAT Test Cases """
1750 def setUpConstants(cls):
1751 super(TestDeterministicNAT, cls).setUpConstants()
1752 cls.vpp_cmdline.extend(["snat", "{", "deterministic", "}"])
1755 def setUpClass(cls):
1756 super(TestDeterministicNAT, cls).setUpClass()
1759 cls.tcp_port_in = 6303
1760 cls.tcp_external_port = 6303
1761 cls.udp_port_in = 6304
1762 cls.udp_external_port = 6304
1763 cls.icmp_id_in = 6305
1764 cls.snat_addr = '10.0.0.3'
1766 cls.create_pg_interfaces(range(3))
1767 cls.interfaces = list(cls.pg_interfaces)
1769 for i in cls.interfaces:
1774 cls.pg0.generate_remote_hosts(2)
1775 cls.pg0.configure_ipv4_neighbors()
1778 super(TestDeterministicNAT, cls).tearDownClass()
1781 def create_stream_in(self, in_if, out_if, ttl=64):
1783 Create packet stream for inside network
1785 :param in_if: Inside interface
1786 :param out_if: Outside interface
1787 :param ttl: TTL of generated packets
1791 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1792 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1793 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
1797 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1798 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1799 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
1803 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1804 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1805 ICMP(id=self.icmp_id_in, type='echo-request'))
1810 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
1812 Create packet stream for outside network
1814 :param out_if: Outside interface
1815 :param dst_ip: Destination IP address (Default use global SNAT address)
1816 :param ttl: TTL of generated packets
1819 dst_ip = self.snat_addr
1822 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1823 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1824 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
1828 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1829 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1830 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
1834 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1835 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1836 ICMP(id=self.icmp_external_id, type='echo-reply'))
1841 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
1843 Verify captured packets on outside network
1845 :param capture: Captured packets
1846 :param nat_ip: Translated IP address (Default use global SNAT address)
1847 :param same_port: Sorce port number is not translated (Default False)
1848 :param packet_num: Expected number of packets (Default 3)
1851 nat_ip = self.snat_addr
1852 self.assertEqual(packet_num, len(capture))
1853 for packet in capture:
1855 self.assertEqual(packet[IP].src, nat_ip)
1856 if packet.haslayer(TCP):
1857 self.tcp_port_out = packet[TCP].sport
1858 elif packet.haslayer(UDP):
1859 self.udp_port_out = packet[UDP].sport
1861 self.icmp_external_id = packet[ICMP].id
1863 self.logger.error(ppp("Unexpected or invalid packet "
1864 "(outside network):", packet))
1867 def initiate_tcp_session(self, in_if, out_if):
1869 Initiates TCP session
1871 :param in_if: Inside interface
1872 :param out_if: Outside interface
1875 # SYN packet in->out
1876 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
1877 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
1878 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
1881 self.pg_enable_capture(self.pg_interfaces)
1883 capture = out_if.get_capture(1)
1885 self.tcp_port_out = p[TCP].sport
1887 # SYN + ACK packet out->in
1888 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
1889 IP(src=out_if.remote_ip4, dst=self.snat_addr) /
1890 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
1892 out_if.add_stream(p)
1893 self.pg_enable_capture(self.pg_interfaces)
1895 in_if.get_capture(1)
1897 # ACK packet in->out
1898 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
1899 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
1900 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
1903 self.pg_enable_capture(self.pg_interfaces)
1905 out_if.get_capture(1)
1908 self.logger.error("TCP 3 way handshake failed")
1911 def verify_ipfix_max_entries_per_user(self, data):
1913 Verify IPFIX maximum entries per user exceeded event
1915 :param data: Decoded IPFIX data records
1917 self.assertEqual(1, len(data))
1920 self.assertEqual(ord(record[230]), 13)
1921 # natQuotaExceededEvent
1922 self.assertEqual('\x03\x00\x00\x00', record[466])
1924 self.assertEqual(self.pg0.remote_ip4n, record[8])
1926 def test_deterministic_mode(self):
1927 """ S-NAT run deterministic mode """
1928 in_addr = '172.16.255.0'
1929 out_addr = '172.17.255.50'
1930 in_addr_t = '172.16.255.20'
1931 in_addr_n = socket.inet_aton(in_addr)
1932 out_addr_n = socket.inet_aton(out_addr)
1933 in_addr_t_n = socket.inet_aton(in_addr_t)
1937 snat_config = self.vapi.snat_show_config()
1938 self.assertEqual(1, snat_config.deterministic)
1940 self.vapi.snat_add_det_map(in_addr_n, in_plen, out_addr_n, out_plen)
1942 rep1 = self.vapi.snat_det_forward(in_addr_t_n)
1943 self.assertEqual(rep1.out_addr[:4], out_addr_n)
1944 rep2 = self.vapi.snat_det_reverse(out_addr_n, rep1.out_port_hi)
1945 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
1947 deterministic_mappings = self.vapi.snat_det_map_dump()
1948 self.assertEqual(len(deterministic_mappings), 1)
1949 dsm = deterministic_mappings[0]
1950 self.assertEqual(in_addr_n, dsm.in_addr[:4])
1951 self.assertEqual(in_plen, dsm.in_plen)
1952 self.assertEqual(out_addr_n, dsm.out_addr[:4])
1953 self.assertEqual(out_plen, dsm.out_plen)
1956 deterministic_mappings = self.vapi.snat_det_map_dump()
1957 self.assertEqual(len(deterministic_mappings), 0)
1959 def test_set_timeouts(self):
1960 """ Set deterministic NAT timeouts """
1961 timeouts_before = self.vapi.snat_det_get_timeouts()
1963 self.vapi.snat_det_set_timeouts(timeouts_before.udp + 10,
1964 timeouts_before.tcp_established + 10,
1965 timeouts_before.tcp_transitory + 10,
1966 timeouts_before.icmp + 10)
1968 timeouts_after = self.vapi.snat_det_get_timeouts()
1970 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
1971 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
1972 self.assertNotEqual(timeouts_before.tcp_established,
1973 timeouts_after.tcp_established)
1974 self.assertNotEqual(timeouts_before.tcp_transitory,
1975 timeouts_after.tcp_transitory)
1977 def test_det_in(self):
1978 """ CGNAT translation test (TCP, UDP, ICMP) """
1980 nat_ip = "10.0.0.10"
1982 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
1984 socket.inet_aton(nat_ip),
1986 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1987 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1991 pkts = self.create_stream_in(self.pg0, self.pg1)
1992 self.pg0.add_stream(pkts)
1993 self.pg_enable_capture(self.pg_interfaces)
1995 capture = self.pg1.get_capture(len(pkts))
1996 self.verify_capture_out(capture, nat_ip)
1999 pkts = self.create_stream_out(self.pg1, nat_ip)
2000 self.pg1.add_stream(pkts)
2001 self.pg_enable_capture(self.pg_interfaces)
2003 capture = self.pg0.get_capture(len(pkts))
2004 self.verify_capture_in(capture, self.pg0)
2007 sessions = self.vapi.snat_det_session_dump(self.pg0.remote_ip4n)
2008 self.assertEqual(len(sessions), 3)
2012 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2013 self.assertEqual(s.in_port, self.tcp_port_in)
2014 self.assertEqual(s.out_port, self.tcp_port_out)
2015 self.assertEqual(s.ext_port, self.tcp_external_port)
2019 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2020 self.assertEqual(s.in_port, self.udp_port_in)
2021 self.assertEqual(s.out_port, self.udp_port_out)
2022 self.assertEqual(s.ext_port, self.udp_external_port)
2026 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2027 self.assertEqual(s.in_port, self.icmp_id_in)
2028 self.assertEqual(s.out_port, self.icmp_external_id)
2030 def test_multiple_users(self):
2031 """ CGNAT multiple users """
2033 nat_ip = "10.0.0.10"
2035 external_port = 6303
2037 host0 = self.pg0.remote_hosts[0]
2038 host1 = self.pg0.remote_hosts[1]
2040 self.vapi.snat_add_det_map(host0.ip4n,
2042 socket.inet_aton(nat_ip),
2044 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2045 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2049 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
2050 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
2051 TCP(sport=port_in, dport=external_port))
2052 self.pg0.add_stream(p)
2053 self.pg_enable_capture(self.pg_interfaces)
2055 capture = self.pg1.get_capture(1)
2060 self.assertEqual(ip.src, nat_ip)
2061 self.assertEqual(ip.dst, self.pg1.remote_ip4)
2062 self.assertEqual(tcp.dport, external_port)
2063 port_out0 = tcp.sport
2065 self.logger.error(ppp("Unexpected or invalid packet:", p))
2069 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
2070 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
2071 TCP(sport=port_in, dport=external_port))
2072 self.pg0.add_stream(p)
2073 self.pg_enable_capture(self.pg_interfaces)
2075 capture = self.pg1.get_capture(1)
2080 self.assertEqual(ip.src, nat_ip)
2081 self.assertEqual(ip.dst, self.pg1.remote_ip4)
2082 self.assertEqual(tcp.dport, external_port)
2083 port_out1 = tcp.sport
2085 self.logger.error(ppp("Unexpected or invalid packet:", p))
2088 dms = self.vapi.snat_det_map_dump()
2089 self.assertEqual(1, len(dms))
2090 self.assertEqual(2, dms[0].ses_num)
2093 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2094 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2095 TCP(sport=external_port, dport=port_out0))
2096 self.pg1.add_stream(p)
2097 self.pg_enable_capture(self.pg_interfaces)
2099 capture = self.pg0.get_capture(1)
2104 self.assertEqual(ip.src, self.pg1.remote_ip4)
2105 self.assertEqual(ip.dst, host0.ip4)
2106 self.assertEqual(tcp.dport, port_in)
2107 self.assertEqual(tcp.sport, external_port)
2109 self.logger.error(ppp("Unexpected or invalid packet:", p))
2113 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2114 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2115 TCP(sport=external_port, dport=port_out1))
2116 self.pg1.add_stream(p)
2117 self.pg_enable_capture(self.pg_interfaces)
2119 capture = self.pg0.get_capture(1)
2124 self.assertEqual(ip.src, self.pg1.remote_ip4)
2125 self.assertEqual(ip.dst, host1.ip4)
2126 self.assertEqual(tcp.dport, port_in)
2127 self.assertEqual(tcp.sport, external_port)
2129 self.logger.error(ppp("Unexpected or invalid packet", p))
2132 # session close api test
2133 self.vapi.snat_det_close_session_out(socket.inet_aton(nat_ip),
2135 self.pg1.remote_ip4n,
2137 dms = self.vapi.snat_det_map_dump()
2138 self.assertEqual(dms[0].ses_num, 1)
2140 self.vapi.snat_det_close_session_in(host0.ip4n,
2142 self.pg1.remote_ip4n,
2144 dms = self.vapi.snat_det_map_dump()
2145 self.assertEqual(dms[0].ses_num, 0)
2147 def test_tcp_session_close_detection_in(self):
2148 """ CGNAT TCP session close initiated from inside network """
2149 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2151 socket.inet_aton(self.snat_addr),
2153 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2154 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2157 self.initiate_tcp_session(self.pg0, self.pg1)
2159 # close the session from inside
2161 # FIN packet in -> out
2162 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2163 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2164 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2166 self.pg0.add_stream(p)
2167 self.pg_enable_capture(self.pg_interfaces)
2169 self.pg1.get_capture(1)
2173 # ACK packet out -> in
2174 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2175 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2176 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2180 # FIN packet out -> in
2181 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2182 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2183 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2187 self.pg1.add_stream(pkts)
2188 self.pg_enable_capture(self.pg_interfaces)
2190 self.pg0.get_capture(2)
2192 # ACK packet in -> out
2193 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2194 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2195 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2197 self.pg0.add_stream(p)
2198 self.pg_enable_capture(self.pg_interfaces)
2200 self.pg1.get_capture(1)
2202 # Check if snat closed the session
2203 dms = self.vapi.snat_det_map_dump()
2204 self.assertEqual(0, dms[0].ses_num)
2206 self.logger.error("TCP session termination failed")
2209 def test_tcp_session_close_detection_out(self):
2210 """ CGNAT TCP session close initiated from outside network """
2211 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2213 socket.inet_aton(self.snat_addr),
2215 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2216 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2219 self.initiate_tcp_session(self.pg0, self.pg1)
2221 # close the session from outside
2223 # FIN packet out -> in
2224 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2225 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2226 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2228 self.pg1.add_stream(p)
2229 self.pg_enable_capture(self.pg_interfaces)
2231 self.pg0.get_capture(1)
2235 # ACK packet in -> out
2236 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2237 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2238 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2242 # ACK packet in -> out
2243 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2244 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2245 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2249 self.pg0.add_stream(pkts)
2250 self.pg_enable_capture(self.pg_interfaces)
2252 self.pg1.get_capture(2)
2254 # ACK packet out -> in
2255 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2256 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2257 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2259 self.pg1.add_stream(p)
2260 self.pg_enable_capture(self.pg_interfaces)
2262 self.pg0.get_capture(1)
2264 # Check if snat closed the session
2265 dms = self.vapi.snat_det_map_dump()
2266 self.assertEqual(0, dms[0].ses_num)
2268 self.logger.error("TCP session termination failed")
2271 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2272 def test_session_timeout(self):
2273 """ CGNAT session timeouts """
2274 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2276 socket.inet_aton(self.snat_addr),
2278 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2279 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2282 self.initiate_tcp_session(self.pg0, self.pg1)
2283 self.vapi.snat_det_set_timeouts(5, 5, 5, 5)
2284 pkts = self.create_stream_in(self.pg0, self.pg1)
2285 self.pg0.add_stream(pkts)
2286 self.pg_enable_capture(self.pg_interfaces)
2288 capture = self.pg1.get_capture(len(pkts))
2291 dms = self.vapi.snat_det_map_dump()
2292 self.assertEqual(0, dms[0].ses_num)
2294 def test_session_limit_per_user(self):
2295 """ CGNAT maximum 1000 sessions per user should be created """
2296 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2298 socket.inet_aton(self.snat_addr),
2300 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2301 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2303 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
2304 src_address=self.pg2.local_ip4n,
2306 template_interval=10)
2307 self.vapi.snat_ipfix()
2310 for port in range(1025, 2025):
2311 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2312 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2313 UDP(sport=port, dport=port))
2316 self.pg0.add_stream(pkts)
2317 self.pg_enable_capture(self.pg_interfaces)
2319 capture = self.pg1.get_capture(len(pkts))
2321 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2322 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2323 UDP(sport=3001, dport=3002))
2324 self.pg0.add_stream(p)
2325 self.pg_enable_capture(self.pg_interfaces)
2327 capture = self.pg1.assert_nothing_captured()
2329 # verify ICMP error packet
2330 capture = self.pg0.get_capture(1)
2332 self.assertTrue(p.haslayer(ICMP))
2334 self.assertEqual(icmp.type, 3)
2335 self.assertEqual(icmp.code, 1)
2336 self.assertTrue(icmp.haslayer(IPerror))
2337 inner_ip = icmp[IPerror]
2338 self.assertEqual(inner_ip[UDPerror].sport, 3001)
2339 self.assertEqual(inner_ip[UDPerror].dport, 3002)
2341 dms = self.vapi.snat_det_map_dump()
2343 self.assertEqual(1000, dms[0].ses_num)
2345 # verify IPFIX logging
2346 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2347 capture = self.pg2.get_capture(2)
2348 ipfix = IPFIXDecoder()
2349 # first load template
2351 self.assertTrue(p.haslayer(IPFIX))
2352 if p.haslayer(Template):
2353 ipfix.add_template(p.getlayer(Template))
2354 # verify events in data set
2356 if p.haslayer(Data):
2357 data = ipfix.decode_data_set(p.getlayer(Set))
2358 self.verify_ipfix_max_entries_per_user(data)
2360 def clear_snat(self):
2362 Clear SNAT configuration.
2364 self.vapi.snat_ipfix(enable=0)
2365 self.vapi.snat_det_set_timeouts()
2366 deterministic_mappings = self.vapi.snat_det_map_dump()
2367 for dsm in deterministic_mappings:
2368 self.vapi.snat_add_det_map(dsm.in_addr,
2374 interfaces = self.vapi.snat_interface_dump()
2375 for intf in interfaces:
2376 self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
2381 super(TestDeterministicNAT, self).tearDown()
2382 if not self.vpp_dead:
2383 self.logger.info(self.vapi.cli("show snat detail"))
2387 class TestNAT64(MethodHolder):
2388 """ NAT64 Test Cases """
2391 def setUpClass(cls):
2392 super(TestNAT64, cls).setUpClass()
2395 cls.tcp_port_in = 6303
2396 cls.tcp_port_out = 6303
2397 cls.udp_port_in = 6304
2398 cls.udp_port_out = 6304
2399 cls.icmp_id_in = 6305
2400 cls.icmp_id_out = 6305
2401 cls.nat_addr = '10.0.0.3'
2402 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
2404 cls.create_pg_interfaces(range(2))
2405 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
2406 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
2408 for i in cls.ip6_interfaces:
2413 for i in cls.ip4_interfaces:
2419 super(TestNAT64, cls).tearDownClass()
2422 def test_pool(self):
2423 """ Add/delete address to NAT64 pool """
2424 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
2426 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
2428 addresses = self.vapi.nat64_pool_addr_dump()
2429 self.assertEqual(len(addresses), 1)
2430 self.assertEqual(addresses[0].address, nat_addr)
2432 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
2434 addresses = self.vapi.nat64_pool_addr_dump()
2435 self.assertEqual(len(addresses), 0)
2437 def test_interface(self):
2438 """ Enable/disable NAT64 feature on the interface """
2439 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2440 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2442 interfaces = self.vapi.nat64_interface_dump()
2443 self.assertEqual(len(interfaces), 2)
2446 for intf in interfaces:
2447 if intf.sw_if_index == self.pg0.sw_if_index:
2448 self.assertEqual(intf.is_inside, 1)
2450 elif intf.sw_if_index == self.pg1.sw_if_index:
2451 self.assertEqual(intf.is_inside, 0)
2453 self.assertTrue(pg0_found)
2454 self.assertTrue(pg1_found)
2456 features = self.vapi.cli("show interface features pg0")
2457 self.assertNotEqual(features.find('nat64-in2out'), -1)
2458 features = self.vapi.cli("show interface features pg1")
2459 self.assertNotEqual(features.find('nat64-out2in'), -1)
2461 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
2462 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
2464 interfaces = self.vapi.nat64_interface_dump()
2465 self.assertEqual(len(interfaces), 0)
2467 def test_static_bib(self):
2468 """ Add/delete static BIB entry """
2469 in_addr = socket.inet_pton(socket.AF_INET6,
2470 '2001:db8:85a3::8a2e:370:7334')
2471 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
2474 proto = IP_PROTOS.tcp
2476 self.vapi.nat64_add_del_static_bib(in_addr,
2481 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
2486 self.assertEqual(bibe.i_addr, in_addr)
2487 self.assertEqual(bibe.o_addr, out_addr)
2488 self.assertEqual(bibe.i_port, in_port)
2489 self.assertEqual(bibe.o_port, out_port)
2490 self.assertEqual(static_bib_num, 1)
2492 self.vapi.nat64_add_del_static_bib(in_addr,
2498 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
2503 self.assertEqual(static_bib_num, 0)
2505 def test_set_timeouts(self):
2506 """ Set NAT64 timeouts """
2507 # verify default values
2508 timeouts = self.vapi.nat64_get_timeouts()
2509 self.assertEqual(timeouts.udp, 300)
2510 self.assertEqual(timeouts.icmp, 60)
2511 self.assertEqual(timeouts.tcp_trans, 240)
2512 self.assertEqual(timeouts.tcp_est, 7440)
2513 self.assertEqual(timeouts.tcp_incoming_syn, 6)
2515 # set and verify custom values
2516 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
2517 tcp_est=7450, tcp_incoming_syn=10)
2518 timeouts = self.vapi.nat64_get_timeouts()
2519 self.assertEqual(timeouts.udp, 200)
2520 self.assertEqual(timeouts.icmp, 30)
2521 self.assertEqual(timeouts.tcp_trans, 250)
2522 self.assertEqual(timeouts.tcp_est, 7450)
2523 self.assertEqual(timeouts.tcp_incoming_syn, 10)
2525 def test_dynamic(self):
2526 """ NAT64 dynamic translation test """
2527 self.tcp_port_in = 6303
2528 self.udp_port_in = 6304
2529 self.icmp_id_in = 6305
2531 ses_num_start = self.nat64_get_ses_num()
2533 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2535 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2536 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2539 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2540 self.pg0.add_stream(pkts)
2541 self.pg_enable_capture(self.pg_interfaces)
2543 capture = self.pg1.get_capture(3)
2544 self.verify_capture_out(capture, packet_num=3, nat_ip=self.nat_addr,
2545 dst_ip=self.pg1.remote_ip4)
2548 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2549 self.pg1.add_stream(pkts)
2550 self.pg_enable_capture(self.pg_interfaces)
2552 capture = self.pg0.get_capture(3)
2553 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2554 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2557 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2558 self.pg0.add_stream(pkts)
2559 self.pg_enable_capture(self.pg_interfaces)
2561 capture = self.pg1.get_capture(3)
2562 self.verify_capture_out(capture, packet_num=3, nat_ip=self.nat_addr,
2563 dst_ip=self.pg1.remote_ip4)
2566 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2567 self.pg1.add_stream(pkts)
2568 self.pg_enable_capture(self.pg_interfaces)
2570 capture = self.pg0.get_capture(3)
2571 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2572 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2574 ses_num_end = self.nat64_get_ses_num()
2576 self.assertEqual(ses_num_end - ses_num_start, 3)
2578 def test_static(self):
2579 """ NAT64 static translation test """
2580 self.tcp_port_in = 60303
2581 self.udp_port_in = 60304
2582 self.icmp_id_in = 60305
2583 self.tcp_port_out = 60303
2584 self.udp_port_out = 60304
2585 self.icmp_id_out = 60305
2587 ses_num_start = self.nat64_get_ses_num()
2589 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2591 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2592 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2594 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2599 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2604 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2611 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2612 self.pg0.add_stream(pkts)
2613 self.pg_enable_capture(self.pg_interfaces)
2615 capture = self.pg1.get_capture(3)
2616 self.verify_capture_out(capture, packet_num=3, nat_ip=self.nat_addr,
2617 dst_ip=self.pg1.remote_ip4, same_port=True)
2620 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2621 self.pg1.add_stream(pkts)
2622 self.pg_enable_capture(self.pg_interfaces)
2624 capture = self.pg0.get_capture(3)
2625 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2626 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2628 ses_num_end = self.nat64_get_ses_num()
2630 self.assertEqual(ses_num_end - ses_num_start, 3)
2632 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2633 def test_session_timeout(self):
2634 """ NAT64 session timeout """
2635 self.icmp_id_in = 1234
2636 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2638 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2639 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2640 self.vapi.nat64_set_timeouts(icmp=5)
2642 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2643 self.pg0.add_stream(pkts)
2644 self.pg_enable_capture(self.pg_interfaces)
2646 capture = self.pg1.get_capture(3)
2648 ses_num_before_timeout = self.nat64_get_ses_num()
2652 # ICMP session after timeout
2653 ses_num_after_timeout = self.nat64_get_ses_num()
2654 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
2656 def test_icmp_error(self):
2657 """ NAT64 ICMP Error message translation """
2658 self.tcp_port_in = 6303
2659 self.udp_port_in = 6304
2660 self.icmp_id_in = 6305
2662 ses_num_start = self.nat64_get_ses_num()
2664 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2666 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2667 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2669 # send some packets to create sessions
2670 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2671 self.pg0.add_stream(pkts)
2672 self.pg_enable_capture(self.pg_interfaces)
2674 capture_ip4 = self.pg1.get_capture(len(pkts))
2675 self.verify_capture_out(capture_ip4, packet_num=3,
2676 nat_ip=self.nat_addr,
2677 dst_ip=self.pg1.remote_ip4)
2679 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2680 self.pg1.add_stream(pkts)
2681 self.pg_enable_capture(self.pg_interfaces)
2683 capture_ip6 = self.pg0.get_capture(len(pkts))
2684 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2685 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
2686 self.pg0.remote_ip6)
2689 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2690 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
2691 ICMPv6DestUnreach(code=1) /
2692 packet[IPv6] for packet in capture_ip6]
2693 self.pg0.add_stream(pkts)
2694 self.pg_enable_capture(self.pg_interfaces)
2696 capture = self.pg1.get_capture(len(pkts))
2697 for packet in capture:
2699 self.assertEqual(packet[IP].src, self.nat_addr)
2700 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2701 self.assertEqual(packet[ICMP].type, 3)
2702 self.assertEqual(packet[ICMP].code, 13)
2703 inner = packet[IPerror]
2704 self.assertEqual(inner.src, self.pg1.remote_ip4)
2705 self.assertEqual(inner.dst, self.nat_addr)
2706 if inner.haslayer(TCPerror):
2707 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
2708 elif inner.haslayer(UDPerror):
2709 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
2711 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
2713 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2717 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2718 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2719 ICMP(type=3, code=13) /
2720 packet[IP] for packet in capture_ip4]
2721 self.pg1.add_stream(pkts)
2722 self.pg_enable_capture(self.pg_interfaces)
2724 capture = self.pg0.get_capture(len(pkts))
2725 for packet in capture:
2727 self.assertEqual(packet[IPv6].src, ip.src)
2728 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
2729 icmp = packet[ICMPv6DestUnreach]
2730 self.assertEqual(icmp.code, 1)
2731 inner = icmp[IPerror6]
2732 self.assertEqual(inner.src, self.pg0.remote_ip6)
2733 self.assertEqual(inner.dst, ip.src)
2734 if inner.haslayer(TCPerror):
2735 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
2736 elif inner.haslayer(UDPerror):
2737 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
2739 self.assertEqual(inner[ICMPv6EchoRequest].id,
2742 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2745 def nat64_get_ses_num(self):
2747 Return number of active NAT64 sessions.
2750 st = self.vapi.nat64_st_dump(IP_PROTOS.tcp)
2752 st = self.vapi.nat64_st_dump(IP_PROTOS.udp)
2754 st = self.vapi.nat64_st_dump(IP_PROTOS.icmp)
2758 def clear_nat64(self):
2760 Clear NAT64 configuration.
2762 self.vapi.nat64_set_timeouts()
2764 interfaces = self.vapi.nat64_interface_dump()
2765 for intf in interfaces:
2766 self.vapi.nat64_add_del_interface(intf.sw_if_index,
2770 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
2773 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
2781 bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
2784 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
2792 bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
2795 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
2803 adresses = self.vapi.nat64_pool_addr_dump()
2804 for addr in adresses:
2805 self.vapi.nat64_add_del_pool_addr_range(addr.address,
2810 super(TestNAT64, self).tearDown()
2811 if not self.vpp_dead:
2812 self.logger.info(self.vapi.cli("show nat64 pool"))
2813 self.logger.info(self.vapi.cli("show nat64 interfaces"))
2814 self.logger.info(self.vapi.cli("show nat64 bib tcp"))
2815 self.logger.info(self.vapi.cli("show nat64 bib udp"))
2816 self.logger.info(self.vapi.cli("show nat64 bib icmp"))
2817 self.logger.info(self.vapi.cli("show nat64 session table tcp"))
2818 self.logger.info(self.vapi.cli("show nat64 session table udp"))
2819 self.logger.info(self.vapi.cli("show nat64 session table icmp"))
2822 if __name__ == '__main__':
2823 unittest.main(testRunner=VppTestRunner)