7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
10 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
11 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6
12 from scapy.layers.l2 import Ether, ARP
13 from scapy.data import IP_PROTOS
14 from scapy.packet import bind_layers
16 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
17 from time import sleep
20 class MethodHolder(VppTestCase):
21 """ SNAT create capture and verify method holder """
25 super(MethodHolder, cls).setUpClass()
28 super(MethodHolder, self).tearDown()
30 def create_stream_in(self, in_if, out_if, ttl=64):
32 Create packet stream for inside network
34 :param in_if: Inside interface
35 :param out_if: Outside interface
36 :param ttl: TTL of generated packets
40 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
41 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
42 TCP(sport=self.tcp_port_in, dport=20))
46 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
47 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
48 UDP(sport=self.udp_port_in, dport=20))
52 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
53 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
54 ICMP(id=self.icmp_id_in, type='echo-request'))
59 def create_stream_in_ip6(self, in_if, out_if, hlim=64):
61 Create IPv6 packet stream for inside network
63 :param in_if: Inside interface
64 :param out_if: Outside interface
65 :param ttl: Hop Limit of generated packets
68 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
70 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
71 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
72 TCP(sport=self.tcp_port_in, dport=20))
76 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
77 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
78 UDP(sport=self.udp_port_in, dport=20))
82 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
83 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
84 ICMPv6EchoRequest(id=self.icmp_id_in))
89 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
91 Create packet stream for outside network
93 :param out_if: Outside interface
94 :param dst_ip: Destination IP address (Default use global SNAT address)
95 :param ttl: TTL of generated packets
98 dst_ip = self.snat_addr
101 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
102 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
103 TCP(dport=self.tcp_port_out, sport=20))
107 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
108 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
109 UDP(dport=self.udp_port_out, sport=20))
113 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
114 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
115 ICMP(id=self.icmp_id_out, type='echo-reply'))
120 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
121 packet_num=3, dst_ip=None):
123 Verify captured packets on outside network
125 :param capture: Captured packets
126 :param nat_ip: Translated IP address (Default use global SNAT address)
127 :param same_port: Sorce port number is not translated (Default False)
128 :param packet_num: Expected number of packets (Default 3)
129 :param dst_ip: Destination IP address (Default do not verify)
132 nat_ip = self.snat_addr
133 self.assertEqual(packet_num, len(capture))
134 for packet in capture:
136 self.assertEqual(packet[IP].src, nat_ip)
137 if dst_ip is not None:
138 self.assertEqual(packet[IP].dst, dst_ip)
139 if packet.haslayer(TCP):
141 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
144 packet[TCP].sport, self.tcp_port_in)
145 self.tcp_port_out = packet[TCP].sport
146 elif packet.haslayer(UDP):
148 self.assertEqual(packet[UDP].sport, self.udp_port_in)
151 packet[UDP].sport, self.udp_port_in)
152 self.udp_port_out = packet[UDP].sport
155 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
157 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
158 self.icmp_id_out = packet[ICMP].id
160 self.logger.error(ppp("Unexpected or invalid packet "
161 "(outside network):", packet))
164 def verify_capture_in(self, capture, in_if, packet_num=3):
166 Verify captured packets on inside network
168 :param capture: Captured packets
169 :param in_if: Inside interface
170 :param packet_num: Expected number of packets (Default 3)
172 self.assertEqual(packet_num, len(capture))
173 for packet in capture:
175 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
176 if packet.haslayer(TCP):
177 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
178 elif packet.haslayer(UDP):
179 self.assertEqual(packet[UDP].dport, self.udp_port_in)
181 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
183 self.logger.error(ppp("Unexpected or invalid packet "
184 "(inside network):", packet))
187 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
189 Verify captured IPv6 packets on inside network
191 :param capture: Captured packets
192 :param src_ip: Source IP
193 :param dst_ip: Destination IP address
194 :param packet_num: Expected number of packets (Default 3)
196 self.assertEqual(packet_num, len(capture))
197 for packet in capture:
199 self.assertEqual(packet[IPv6].src, src_ip)
200 self.assertEqual(packet[IPv6].dst, dst_ip)
201 if packet.haslayer(TCP):
202 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
203 elif packet.haslayer(UDP):
204 self.assertEqual(packet[UDP].dport, self.udp_port_in)
206 self.assertEqual(packet[ICMPv6EchoReply].id,
209 self.logger.error(ppp("Unexpected or invalid packet "
210 "(inside network):", packet))
213 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
215 Verify captured packet that don't have to be translated
217 :param capture: Captured packets
218 :param ingress_if: Ingress interface
219 :param egress_if: Egress interface
221 for packet in capture:
223 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
224 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
225 if packet.haslayer(TCP):
226 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
227 elif packet.haslayer(UDP):
228 self.assertEqual(packet[UDP].sport, self.udp_port_in)
230 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
232 self.logger.error(ppp("Unexpected or invalid packet "
233 "(inside network):", packet))
236 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
237 packet_num=3, icmp_type=11):
239 Verify captured packets with ICMP errors on outside network
241 :param capture: Captured packets
242 :param src_ip: Translated IP address or IP address of VPP
243 (Default use global SNAT address)
244 :param packet_num: Expected number of packets (Default 3)
245 :param icmp_type: Type of error ICMP packet
246 we are expecting (Default 11)
249 src_ip = self.snat_addr
250 self.assertEqual(packet_num, len(capture))
251 for packet in capture:
253 self.assertEqual(packet[IP].src, src_ip)
254 self.assertTrue(packet.haslayer(ICMP))
256 self.assertEqual(icmp.type, icmp_type)
257 self.assertTrue(icmp.haslayer(IPerror))
258 inner_ip = icmp[IPerror]
259 if inner_ip.haslayer(TCPerror):
260 self.assertEqual(inner_ip[TCPerror].dport,
262 elif inner_ip.haslayer(UDPerror):
263 self.assertEqual(inner_ip[UDPerror].dport,
266 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
268 self.logger.error(ppp("Unexpected or invalid packet "
269 "(outside network):", packet))
272 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
275 Verify captured packets with ICMP errors on inside network
277 :param capture: Captured packets
278 :param in_if: Inside interface
279 :param packet_num: Expected number of packets (Default 3)
280 :param icmp_type: Type of error ICMP packet
281 we are expecting (Default 11)
283 self.assertEqual(packet_num, len(capture))
284 for packet in capture:
286 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
287 self.assertTrue(packet.haslayer(ICMP))
289 self.assertEqual(icmp.type, icmp_type)
290 self.assertTrue(icmp.haslayer(IPerror))
291 inner_ip = icmp[IPerror]
292 if inner_ip.haslayer(TCPerror):
293 self.assertEqual(inner_ip[TCPerror].sport,
295 elif inner_ip.haslayer(UDPerror):
296 self.assertEqual(inner_ip[UDPerror].sport,
299 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
301 self.logger.error(ppp("Unexpected or invalid packet "
302 "(inside network):", packet))
305 def verify_ipfix_nat44_ses(self, data):
307 Verify IPFIX NAT44 session create/delete event
309 :param data: Decoded IPFIX data records
311 nat44_ses_create_num = 0
312 nat44_ses_delete_num = 0
313 self.assertEqual(6, len(data))
316 self.assertIn(ord(record[230]), [4, 5])
317 if ord(record[230]) == 4:
318 nat44_ses_create_num += 1
320 nat44_ses_delete_num += 1
322 self.assertEqual(self.pg0.remote_ip4n, record[8])
323 # postNATSourceIPv4Address
324 self.assertEqual(socket.inet_pton(socket.AF_INET, self.snat_addr),
327 self.assertEqual(struct.pack("!I", 0), record[234])
328 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
329 if IP_PROTOS.icmp == ord(record[4]):
330 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
331 self.assertEqual(struct.pack("!H", self.icmp_id_out),
333 elif IP_PROTOS.tcp == ord(record[4]):
334 self.assertEqual(struct.pack("!H", self.tcp_port_in),
336 self.assertEqual(struct.pack("!H", self.tcp_port_out),
338 elif IP_PROTOS.udp == ord(record[4]):
339 self.assertEqual(struct.pack("!H", self.udp_port_in),
341 self.assertEqual(struct.pack("!H", self.udp_port_out),
344 self.fail("Invalid protocol")
345 self.assertEqual(3, nat44_ses_create_num)
346 self.assertEqual(3, nat44_ses_delete_num)
348 def verify_ipfix_addr_exhausted(self, data):
350 Verify IPFIX NAT addresses event
352 :param data: Decoded IPFIX data records
354 self.assertEqual(1, len(data))
357 self.assertEqual(ord(record[230]), 3)
359 self.assertEqual(struct.pack("!I", 0), record[283])
362 class TestSNAT(MethodHolder):
363 """ SNAT Test Cases """
367 super(TestSNAT, cls).setUpClass()
370 cls.tcp_port_in = 6303
371 cls.tcp_port_out = 6303
372 cls.udp_port_in = 6304
373 cls.udp_port_out = 6304
374 cls.icmp_id_in = 6305
375 cls.icmp_id_out = 6305
376 cls.snat_addr = '10.0.0.3'
377 cls.ipfix_src_port = 4739
378 cls.ipfix_domain_id = 1
380 cls.create_pg_interfaces(range(9))
381 cls.interfaces = list(cls.pg_interfaces[0:4])
383 for i in cls.interfaces:
388 cls.pg0.generate_remote_hosts(3)
389 cls.pg0.configure_ipv4_neighbors()
391 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
393 cls.pg4._local_ip4 = "172.16.255.1"
394 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
395 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
396 cls.pg4.set_table_ip4(10)
397 cls.pg5._local_ip4 = "172.16.255.3"
398 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
399 cls.pg5._remote_hosts[0]._ip4 = "172.16.255.4"
400 cls.pg5.set_table_ip4(10)
401 cls.pg6._local_ip4 = "172.16.255.1"
402 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
403 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
404 cls.pg6.set_table_ip4(20)
405 for i in cls.overlapping_interfaces:
414 super(TestSNAT, cls).tearDownClass()
417 def clear_snat(self):
419 Clear SNAT configuration.
421 # I found no elegant way to do this
422 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
423 dst_address_length=32,
424 next_hop_address=self.pg7.remote_ip4n,
425 next_hop_sw_if_index=self.pg7.sw_if_index,
427 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
428 dst_address_length=32,
429 next_hop_address=self.pg8.remote_ip4n,
430 next_hop_sw_if_index=self.pg8.sw_if_index,
433 for intf in [self.pg7, self.pg8]:
434 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
436 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
441 if self.pg7.has_ip4_config:
442 self.pg7.unconfig_ip4()
444 interfaces = self.vapi.snat_interface_addr_dump()
445 for intf in interfaces:
446 self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
448 self.vapi.snat_ipfix(enable=0, src_port=self.ipfix_src_port,
449 domain_id=self.ipfix_domain_id)
450 self.ipfix_src_port = 4739
451 self.ipfix_domain_id = 1
453 interfaces = self.vapi.snat_interface_dump()
454 for intf in interfaces:
455 self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
459 static_mappings = self.vapi.snat_static_mapping_dump()
460 for sm in static_mappings:
461 self.vapi.snat_add_static_mapping(sm.local_ip_address,
462 sm.external_ip_address,
463 local_port=sm.local_port,
464 external_port=sm.external_port,
465 addr_only=sm.addr_only,
467 protocol=sm.protocol,
470 adresses = self.vapi.snat_address_dump()
471 for addr in adresses:
472 self.vapi.snat_add_address_range(addr.ip_address,
476 def snat_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
477 local_port=0, external_port=0, vrf_id=0,
478 is_add=1, external_sw_if_index=0xFFFFFFFF,
481 Add/delete S-NAT static mapping
483 :param local_ip: Local IP address
484 :param external_ip: External IP address
485 :param local_port: Local port number (Optional)
486 :param external_port: External port number (Optional)
487 :param vrf_id: VRF ID (Default 0)
488 :param is_add: 1 if add, 0 if delete (Default add)
489 :param external_sw_if_index: External interface instead of IP address
490 :param proto: IP protocol (Mandatory if port specified)
493 if local_port and external_port:
495 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
496 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
497 self.vapi.snat_add_static_mapping(
500 external_sw_if_index,
508 def snat_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
510 Add/delete S-NAT address
512 :param ip: IP address
513 :param is_add: 1 if add, 0 if delete (Default add)
515 snat_addr = socket.inet_pton(socket.AF_INET, ip)
516 self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add,
519 def test_dynamic(self):
520 """ SNAT dynamic translation test """
522 self.snat_add_address(self.snat_addr)
523 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
524 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
528 pkts = self.create_stream_in(self.pg0, self.pg1)
529 self.pg0.add_stream(pkts)
530 self.pg_enable_capture(self.pg_interfaces)
532 capture = self.pg1.get_capture(len(pkts))
533 self.verify_capture_out(capture)
536 pkts = self.create_stream_out(self.pg1)
537 self.pg1.add_stream(pkts)
538 self.pg_enable_capture(self.pg_interfaces)
540 capture = self.pg0.get_capture(len(pkts))
541 self.verify_capture_in(capture, self.pg0)
543 def test_dynamic_icmp_errors_in2out_ttl_1(self):
544 """ SNAT handling of client packets with TTL=1 """
546 self.snat_add_address(self.snat_addr)
547 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
548 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
551 # Client side - generate traffic
552 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
553 self.pg0.add_stream(pkts)
554 self.pg_enable_capture(self.pg_interfaces)
557 # Client side - verify ICMP type 11 packets
558 capture = self.pg0.get_capture(len(pkts))
559 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
561 def test_dynamic_icmp_errors_out2in_ttl_1(self):
562 """ SNAT handling of server packets with TTL=1 """
564 self.snat_add_address(self.snat_addr)
565 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
566 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
569 # Client side - create sessions
570 pkts = self.create_stream_in(self.pg0, self.pg1)
571 self.pg0.add_stream(pkts)
572 self.pg_enable_capture(self.pg_interfaces)
575 # Server side - generate traffic
576 capture = self.pg1.get_capture(len(pkts))
577 self.verify_capture_out(capture)
578 pkts = self.create_stream_out(self.pg1, ttl=1)
579 self.pg1.add_stream(pkts)
580 self.pg_enable_capture(self.pg_interfaces)
583 # Server side - verify ICMP type 11 packets
584 capture = self.pg1.get_capture(len(pkts))
585 self.verify_capture_out_with_icmp_errors(capture,
586 src_ip=self.pg1.local_ip4)
588 def test_dynamic_icmp_errors_in2out_ttl_2(self):
589 """ SNAT handling of error responses to client packets with TTL=2 """
591 self.snat_add_address(self.snat_addr)
592 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
593 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
596 # Client side - generate traffic
597 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
598 self.pg0.add_stream(pkts)
599 self.pg_enable_capture(self.pg_interfaces)
602 # Server side - simulate ICMP type 11 response
603 capture = self.pg1.get_capture(len(pkts))
604 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
605 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
606 ICMP(type=11) / packet[IP] for packet in capture]
607 self.pg1.add_stream(pkts)
608 self.pg_enable_capture(self.pg_interfaces)
611 # Client side - verify ICMP type 11 packets
612 capture = self.pg0.get_capture(len(pkts))
613 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
615 def test_dynamic_icmp_errors_out2in_ttl_2(self):
616 """ SNAT handling of error responses to server packets with TTL=2 """
618 self.snat_add_address(self.snat_addr)
619 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
620 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
623 # Client side - create sessions
624 pkts = self.create_stream_in(self.pg0, self.pg1)
625 self.pg0.add_stream(pkts)
626 self.pg_enable_capture(self.pg_interfaces)
629 # Server side - generate traffic
630 capture = self.pg1.get_capture(len(pkts))
631 self.verify_capture_out(capture)
632 pkts = self.create_stream_out(self.pg1, ttl=2)
633 self.pg1.add_stream(pkts)
634 self.pg_enable_capture(self.pg_interfaces)
637 # Client side - simulate ICMP type 11 response
638 capture = self.pg0.get_capture(len(pkts))
639 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
640 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
641 ICMP(type=11) / packet[IP] for packet in capture]
642 self.pg0.add_stream(pkts)
643 self.pg_enable_capture(self.pg_interfaces)
646 # Server side - verify ICMP type 11 packets
647 capture = self.pg1.get_capture(len(pkts))
648 self.verify_capture_out_with_icmp_errors(capture)
650 def test_ping_out_interface_from_outside(self):
651 """ Ping SNAT out interface from outside network """
653 self.snat_add_address(self.snat_addr)
654 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
655 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
658 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
659 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
660 ICMP(id=self.icmp_id_out, type='echo-request'))
662 self.pg1.add_stream(pkts)
663 self.pg_enable_capture(self.pg_interfaces)
665 capture = self.pg1.get_capture(len(pkts))
666 self.assertEqual(1, len(capture))
669 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
670 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
671 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
672 self.assertEqual(packet[ICMP].type, 0) # echo reply
674 self.logger.error(ppp("Unexpected or invalid packet "
675 "(outside network):", packet))
678 def test_ping_internal_host_from_outside(self):
679 """ Ping internal host from outside network """
681 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr)
682 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
683 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
687 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
688 IP(src=self.pg1.remote_ip4, dst=self.snat_addr, ttl=64) /
689 ICMP(id=self.icmp_id_out, type='echo-request'))
690 self.pg1.add_stream(pkt)
691 self.pg_enable_capture(self.pg_interfaces)
693 capture = self.pg0.get_capture(1)
694 self.verify_capture_in(capture, self.pg0, packet_num=1)
695 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
698 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
699 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
700 ICMP(id=self.icmp_id_in, type='echo-reply'))
701 self.pg0.add_stream(pkt)
702 self.pg_enable_capture(self.pg_interfaces)
704 capture = self.pg1.get_capture(1)
705 self.verify_capture_out(capture, same_port=True, packet_num=1)
706 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
708 def test_static_in(self):
709 """ SNAT 1:1 NAT initialized from inside network """
712 self.tcp_port_out = 6303
713 self.udp_port_out = 6304
714 self.icmp_id_out = 6305
716 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
717 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
718 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
722 pkts = self.create_stream_in(self.pg0, self.pg1)
723 self.pg0.add_stream(pkts)
724 self.pg_enable_capture(self.pg_interfaces)
726 capture = self.pg1.get_capture(len(pkts))
727 self.verify_capture_out(capture, nat_ip, True)
730 pkts = self.create_stream_out(self.pg1, nat_ip)
731 self.pg1.add_stream(pkts)
732 self.pg_enable_capture(self.pg_interfaces)
734 capture = self.pg0.get_capture(len(pkts))
735 self.verify_capture_in(capture, self.pg0)
737 def test_static_out(self):
738 """ SNAT 1:1 NAT initialized from outside network """
741 self.tcp_port_out = 6303
742 self.udp_port_out = 6304
743 self.icmp_id_out = 6305
745 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
746 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
747 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
751 pkts = self.create_stream_out(self.pg1, nat_ip)
752 self.pg1.add_stream(pkts)
753 self.pg_enable_capture(self.pg_interfaces)
755 capture = self.pg0.get_capture(len(pkts))
756 self.verify_capture_in(capture, self.pg0)
759 pkts = self.create_stream_in(self.pg0, self.pg1)
760 self.pg0.add_stream(pkts)
761 self.pg_enable_capture(self.pg_interfaces)
763 capture = self.pg1.get_capture(len(pkts))
764 self.verify_capture_out(capture, nat_ip, True)
766 def test_static_with_port_in(self):
767 """ SNAT 1:1 NAT with port initialized from inside network """
769 self.tcp_port_out = 3606
770 self.udp_port_out = 3607
771 self.icmp_id_out = 3608
773 self.snat_add_address(self.snat_addr)
774 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
775 self.tcp_port_in, self.tcp_port_out,
777 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
778 self.udp_port_in, self.udp_port_out,
780 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
781 self.icmp_id_in, self.icmp_id_out,
782 proto=IP_PROTOS.icmp)
783 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
784 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
788 pkts = self.create_stream_in(self.pg0, self.pg1)
789 self.pg0.add_stream(pkts)
790 self.pg_enable_capture(self.pg_interfaces)
792 capture = self.pg1.get_capture(len(pkts))
793 self.verify_capture_out(capture)
796 pkts = self.create_stream_out(self.pg1)
797 self.pg1.add_stream(pkts)
798 self.pg_enable_capture(self.pg_interfaces)
800 capture = self.pg0.get_capture(len(pkts))
801 self.verify_capture_in(capture, self.pg0)
803 def test_static_with_port_out(self):
804 """ SNAT 1:1 NAT with port initialized from outside network """
806 self.tcp_port_out = 30606
807 self.udp_port_out = 30607
808 self.icmp_id_out = 30608
810 self.snat_add_address(self.snat_addr)
811 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
812 self.tcp_port_in, self.tcp_port_out,
814 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
815 self.udp_port_in, self.udp_port_out,
817 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
818 self.icmp_id_in, self.icmp_id_out,
819 proto=IP_PROTOS.icmp)
820 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
821 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
825 pkts = self.create_stream_out(self.pg1)
826 self.pg1.add_stream(pkts)
827 self.pg_enable_capture(self.pg_interfaces)
829 capture = self.pg0.get_capture(len(pkts))
830 self.verify_capture_in(capture, self.pg0)
833 pkts = self.create_stream_in(self.pg0, self.pg1)
834 self.pg0.add_stream(pkts)
835 self.pg_enable_capture(self.pg_interfaces)
837 capture = self.pg1.get_capture(len(pkts))
838 self.verify_capture_out(capture)
840 def test_static_vrf_aware(self):
841 """ SNAT 1:1 NAT VRF awareness """
843 nat_ip1 = "10.0.0.30"
844 nat_ip2 = "10.0.0.40"
845 self.tcp_port_out = 6303
846 self.udp_port_out = 6304
847 self.icmp_id_out = 6305
849 self.snat_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
851 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
853 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
855 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
856 self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
858 # inside interface VRF match SNAT static mapping VRF
859 pkts = self.create_stream_in(self.pg4, self.pg3)
860 self.pg4.add_stream(pkts)
861 self.pg_enable_capture(self.pg_interfaces)
863 capture = self.pg3.get_capture(len(pkts))
864 self.verify_capture_out(capture, nat_ip1, True)
866 # inside interface VRF don't match SNAT static mapping VRF (packets
868 pkts = self.create_stream_in(self.pg0, self.pg3)
869 self.pg0.add_stream(pkts)
870 self.pg_enable_capture(self.pg_interfaces)
872 self.pg3.assert_nothing_captured()
874 def test_multiple_inside_interfaces(self):
875 """ SNAT multiple inside interfaces (non-overlapping address space) """
877 self.snat_add_address(self.snat_addr)
878 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
879 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
880 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
883 # between two S-NAT inside interfaces (no translation)
884 pkts = self.create_stream_in(self.pg0, self.pg1)
885 self.pg0.add_stream(pkts)
886 self.pg_enable_capture(self.pg_interfaces)
888 capture = self.pg1.get_capture(len(pkts))
889 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
891 # from S-NAT inside to interface without S-NAT feature (no translation)
892 pkts = self.create_stream_in(self.pg0, self.pg2)
893 self.pg0.add_stream(pkts)
894 self.pg_enable_capture(self.pg_interfaces)
896 capture = self.pg2.get_capture(len(pkts))
897 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
899 # in2out 1st interface
900 pkts = self.create_stream_in(self.pg0, self.pg3)
901 self.pg0.add_stream(pkts)
902 self.pg_enable_capture(self.pg_interfaces)
904 capture = self.pg3.get_capture(len(pkts))
905 self.verify_capture_out(capture)
907 # out2in 1st interface
908 pkts = self.create_stream_out(self.pg3)
909 self.pg3.add_stream(pkts)
910 self.pg_enable_capture(self.pg_interfaces)
912 capture = self.pg0.get_capture(len(pkts))
913 self.verify_capture_in(capture, self.pg0)
915 # in2out 2nd interface
916 pkts = self.create_stream_in(self.pg1, self.pg3)
917 self.pg1.add_stream(pkts)
918 self.pg_enable_capture(self.pg_interfaces)
920 capture = self.pg3.get_capture(len(pkts))
921 self.verify_capture_out(capture)
923 # out2in 2nd interface
924 pkts = self.create_stream_out(self.pg3)
925 self.pg3.add_stream(pkts)
926 self.pg_enable_capture(self.pg_interfaces)
928 capture = self.pg1.get_capture(len(pkts))
929 self.verify_capture_in(capture, self.pg1)
931 def test_inside_overlapping_interfaces(self):
932 """ SNAT multiple inside interfaces with overlapping address space """
934 static_nat_ip = "10.0.0.10"
935 self.snat_add_address(self.snat_addr)
936 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
938 self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
939 self.vapi.snat_interface_add_del_feature(self.pg5.sw_if_index)
940 self.vapi.snat_interface_add_del_feature(self.pg6.sw_if_index)
941 self.snat_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
944 # between S-NAT inside interfaces with same VRF (no translation)
945 pkts = self.create_stream_in(self.pg4, self.pg5)
946 self.pg4.add_stream(pkts)
947 self.pg_enable_capture(self.pg_interfaces)
949 capture = self.pg5.get_capture(len(pkts))
950 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
952 # between S-NAT inside interfaces with different VRF (hairpinning)
953 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
954 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
955 TCP(sport=1234, dport=5678))
956 self.pg4.add_stream(p)
957 self.pg_enable_capture(self.pg_interfaces)
959 capture = self.pg6.get_capture(1)
964 self.assertEqual(ip.src, self.snat_addr)
965 self.assertEqual(ip.dst, self.pg6.remote_ip4)
966 self.assertNotEqual(tcp.sport, 1234)
967 self.assertEqual(tcp.dport, 5678)
969 self.logger.error(ppp("Unexpected or invalid packet:", p))
972 # in2out 1st interface
973 pkts = self.create_stream_in(self.pg4, self.pg3)
974 self.pg4.add_stream(pkts)
975 self.pg_enable_capture(self.pg_interfaces)
977 capture = self.pg3.get_capture(len(pkts))
978 self.verify_capture_out(capture)
980 # out2in 1st interface
981 pkts = self.create_stream_out(self.pg3)
982 self.pg3.add_stream(pkts)
983 self.pg_enable_capture(self.pg_interfaces)
985 capture = self.pg4.get_capture(len(pkts))
986 self.verify_capture_in(capture, self.pg4)
988 # in2out 2nd interface
989 pkts = self.create_stream_in(self.pg5, self.pg3)
990 self.pg5.add_stream(pkts)
991 self.pg_enable_capture(self.pg_interfaces)
993 capture = self.pg3.get_capture(len(pkts))
994 self.verify_capture_out(capture)
996 # out2in 2nd interface
997 pkts = self.create_stream_out(self.pg3)
998 self.pg3.add_stream(pkts)
999 self.pg_enable_capture(self.pg_interfaces)
1001 capture = self.pg5.get_capture(len(pkts))
1002 self.verify_capture_in(capture, self.pg5)
1005 addresses = self.vapi.snat_address_dump()
1006 self.assertEqual(len(addresses), 1)
1007 sessions = self.vapi.snat_user_session_dump(self.pg5.remote_ip4n, 10)
1008 self.assertEqual(len(sessions), 3)
1009 for session in sessions:
1010 self.assertFalse(session.is_static)
1011 self.assertEqual(session.inside_ip_address[0:4],
1012 self.pg5.remote_ip4n)
1013 self.assertEqual(session.outside_ip_address,
1014 addresses[0].ip_address)
1015 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1016 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1017 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1018 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1019 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1020 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1021 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1022 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1023 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1025 # in2out 3rd interface
1026 pkts = self.create_stream_in(self.pg6, self.pg3)
1027 self.pg6.add_stream(pkts)
1028 self.pg_enable_capture(self.pg_interfaces)
1030 capture = self.pg3.get_capture(len(pkts))
1031 self.verify_capture_out(capture, static_nat_ip, True)
1033 # out2in 3rd interface
1034 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1035 self.pg3.add_stream(pkts)
1036 self.pg_enable_capture(self.pg_interfaces)
1038 capture = self.pg6.get_capture(len(pkts))
1039 self.verify_capture_in(capture, self.pg6)
1041 # general user and session dump verifications
1042 users = self.vapi.snat_user_dump()
1043 self.assertTrue(len(users) >= 3)
1044 addresses = self.vapi.snat_address_dump()
1045 self.assertEqual(len(addresses), 1)
1047 sessions = self.vapi.snat_user_session_dump(user.ip_address,
1049 for session in sessions:
1050 self.assertEqual(user.ip_address, session.inside_ip_address)
1051 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1052 self.assertTrue(session.protocol in
1053 [IP_PROTOS.tcp, IP_PROTOS.udp,
1057 sessions = self.vapi.snat_user_session_dump(self.pg4.remote_ip4n, 10)
1058 self.assertTrue(len(sessions) >= 4)
1059 for session in sessions:
1060 self.assertFalse(session.is_static)
1061 self.assertEqual(session.inside_ip_address[0:4],
1062 self.pg4.remote_ip4n)
1063 self.assertEqual(session.outside_ip_address,
1064 addresses[0].ip_address)
1067 sessions = self.vapi.snat_user_session_dump(self.pg6.remote_ip4n, 20)
1068 self.assertTrue(len(sessions) >= 3)
1069 for session in sessions:
1070 self.assertTrue(session.is_static)
1071 self.assertEqual(session.inside_ip_address[0:4],
1072 self.pg6.remote_ip4n)
1073 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1074 map(int, static_nat_ip.split('.')))
1075 self.assertTrue(session.inside_port in
1076 [self.tcp_port_in, self.udp_port_in,
1079 def test_hairpinning(self):
1080 """ SNAT hairpinning - 1:1 NAT with port"""
1082 host = self.pg0.remote_hosts[0]
1083 server = self.pg0.remote_hosts[1]
1086 server_in_port = 5678
1087 server_out_port = 8765
1089 self.snat_add_address(self.snat_addr)
1090 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1091 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1093 # add static mapping for server
1094 self.snat_add_static_mapping(server.ip4, self.snat_addr,
1095 server_in_port, server_out_port,
1096 proto=IP_PROTOS.tcp)
1098 # send packet from host to server
1099 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1100 IP(src=host.ip4, dst=self.snat_addr) /
1101 TCP(sport=host_in_port, dport=server_out_port))
1102 self.pg0.add_stream(p)
1103 self.pg_enable_capture(self.pg_interfaces)
1105 capture = self.pg0.get_capture(1)
1110 self.assertEqual(ip.src, self.snat_addr)
1111 self.assertEqual(ip.dst, server.ip4)
1112 self.assertNotEqual(tcp.sport, host_in_port)
1113 self.assertEqual(tcp.dport, server_in_port)
1114 host_out_port = tcp.sport
1116 self.logger.error(ppp("Unexpected or invalid packet:", p))
1119 # send reply from server to host
1120 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1121 IP(src=server.ip4, dst=self.snat_addr) /
1122 TCP(sport=server_in_port, dport=host_out_port))
1123 self.pg0.add_stream(p)
1124 self.pg_enable_capture(self.pg_interfaces)
1126 capture = self.pg0.get_capture(1)
1131 self.assertEqual(ip.src, self.snat_addr)
1132 self.assertEqual(ip.dst, host.ip4)
1133 self.assertEqual(tcp.sport, server_out_port)
1134 self.assertEqual(tcp.dport, host_in_port)
1136 self.logger.error(ppp("Unexpected or invalid packet:"), p)
1139 def test_hairpinning2(self):
1140 """ SNAT hairpinning - 1:1 NAT"""
1142 server1_nat_ip = "10.0.0.10"
1143 server2_nat_ip = "10.0.0.11"
1144 host = self.pg0.remote_hosts[0]
1145 server1 = self.pg0.remote_hosts[1]
1146 server2 = self.pg0.remote_hosts[2]
1147 server_tcp_port = 22
1148 server_udp_port = 20
1150 self.snat_add_address(self.snat_addr)
1151 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1152 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1155 # add static mapping for servers
1156 self.snat_add_static_mapping(server1.ip4, server1_nat_ip)
1157 self.snat_add_static_mapping(server2.ip4, server2_nat_ip)
1161 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1162 IP(src=host.ip4, dst=server1_nat_ip) /
1163 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1165 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1166 IP(src=host.ip4, dst=server1_nat_ip) /
1167 UDP(sport=self.udp_port_in, dport=server_udp_port))
1169 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1170 IP(src=host.ip4, dst=server1_nat_ip) /
1171 ICMP(id=self.icmp_id_in, type='echo-request'))
1173 self.pg0.add_stream(pkts)
1174 self.pg_enable_capture(self.pg_interfaces)
1176 capture = self.pg0.get_capture(len(pkts))
1177 for packet in capture:
1179 self.assertEqual(packet[IP].src, self.snat_addr)
1180 self.assertEqual(packet[IP].dst, server1.ip4)
1181 if packet.haslayer(TCP):
1182 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1183 self.assertEqual(packet[TCP].dport, server_tcp_port)
1184 self.tcp_port_out = packet[TCP].sport
1185 elif packet.haslayer(UDP):
1186 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1187 self.assertEqual(packet[UDP].dport, server_udp_port)
1188 self.udp_port_out = packet[UDP].sport
1190 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1191 self.icmp_id_out = packet[ICMP].id
1193 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1198 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1199 IP(src=server1.ip4, dst=self.snat_addr) /
1200 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1202 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1203 IP(src=server1.ip4, dst=self.snat_addr) /
1204 UDP(sport=server_udp_port, dport=self.udp_port_out))
1206 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1207 IP(src=server1.ip4, dst=self.snat_addr) /
1208 ICMP(id=self.icmp_id_out, type='echo-reply'))
1210 self.pg0.add_stream(pkts)
1211 self.pg_enable_capture(self.pg_interfaces)
1213 capture = self.pg0.get_capture(len(pkts))
1214 for packet in capture:
1216 self.assertEqual(packet[IP].src, server1_nat_ip)
1217 self.assertEqual(packet[IP].dst, host.ip4)
1218 if packet.haslayer(TCP):
1219 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1220 self.assertEqual(packet[TCP].sport, server_tcp_port)
1221 elif packet.haslayer(UDP):
1222 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1223 self.assertEqual(packet[UDP].sport, server_udp_port)
1225 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1227 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1230 # server2 to server1
1232 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1233 IP(src=server2.ip4, dst=server1_nat_ip) /
1234 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1236 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1237 IP(src=server2.ip4, dst=server1_nat_ip) /
1238 UDP(sport=self.udp_port_in, dport=server_udp_port))
1240 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1241 IP(src=server2.ip4, dst=server1_nat_ip) /
1242 ICMP(id=self.icmp_id_in, type='echo-request'))
1244 self.pg0.add_stream(pkts)
1245 self.pg_enable_capture(self.pg_interfaces)
1247 capture = self.pg0.get_capture(len(pkts))
1248 for packet in capture:
1250 self.assertEqual(packet[IP].src, server2_nat_ip)
1251 self.assertEqual(packet[IP].dst, server1.ip4)
1252 if packet.haslayer(TCP):
1253 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1254 self.assertEqual(packet[TCP].dport, server_tcp_port)
1255 self.tcp_port_out = packet[TCP].sport
1256 elif packet.haslayer(UDP):
1257 self.assertEqual(packet[UDP].sport, self.udp_port_in)
1258 self.assertEqual(packet[UDP].dport, server_udp_port)
1259 self.udp_port_out = packet[UDP].sport
1261 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1262 self.icmp_id_out = packet[ICMP].id
1264 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1267 # server1 to server2
1269 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1270 IP(src=server1.ip4, dst=server2_nat_ip) /
1271 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1273 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1274 IP(src=server1.ip4, dst=server2_nat_ip) /
1275 UDP(sport=server_udp_port, dport=self.udp_port_out))
1277 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1278 IP(src=server1.ip4, dst=server2_nat_ip) /
1279 ICMP(id=self.icmp_id_out, type='echo-reply'))
1281 self.pg0.add_stream(pkts)
1282 self.pg_enable_capture(self.pg_interfaces)
1284 capture = self.pg0.get_capture(len(pkts))
1285 for packet in capture:
1287 self.assertEqual(packet[IP].src, server1_nat_ip)
1288 self.assertEqual(packet[IP].dst, server2.ip4)
1289 if packet.haslayer(TCP):
1290 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1291 self.assertEqual(packet[TCP].sport, server_tcp_port)
1292 elif packet.haslayer(UDP):
1293 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1294 self.assertEqual(packet[UDP].sport, server_udp_port)
1296 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1298 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1301 def test_max_translations_per_user(self):
1302 """ MAX translations per user - recycle the least recently used """
1304 self.snat_add_address(self.snat_addr)
1305 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1306 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1309 # get maximum number of translations per user
1310 snat_config = self.vapi.snat_show_config()
1312 # send more than maximum number of translations per user packets
1313 pkts_num = snat_config.max_translations_per_user + 5
1315 for port in range(0, pkts_num):
1316 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1317 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1318 TCP(sport=1025 + port))
1320 self.pg0.add_stream(pkts)
1321 self.pg_enable_capture(self.pg_interfaces)
1324 # verify number of translated packet
1325 self.pg1.get_capture(pkts_num)
1327 def test_interface_addr(self):
1328 """ Acquire SNAT addresses from interface """
1329 self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1331 # no address in NAT pool
1332 adresses = self.vapi.snat_address_dump()
1333 self.assertEqual(0, len(adresses))
1335 # configure interface address and check NAT address pool
1336 self.pg7.config_ip4()
1337 adresses = self.vapi.snat_address_dump()
1338 self.assertEqual(1, len(adresses))
1339 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1341 # remove interface address and check NAT address pool
1342 self.pg7.unconfig_ip4()
1343 adresses = self.vapi.snat_address_dump()
1344 self.assertEqual(0, len(adresses))
1346 def test_interface_addr_static_mapping(self):
1347 """ Static mapping with addresses from interface """
1348 self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1349 self.snat_add_static_mapping('1.2.3.4',
1350 external_sw_if_index=self.pg7.sw_if_index)
1352 # static mappings with external interface
1353 static_mappings = self.vapi.snat_static_mapping_dump()
1354 self.assertEqual(1, len(static_mappings))
1355 self.assertEqual(self.pg7.sw_if_index,
1356 static_mappings[0].external_sw_if_index)
1358 # configure interface address and check static mappings
1359 self.pg7.config_ip4()
1360 static_mappings = self.vapi.snat_static_mapping_dump()
1361 self.assertEqual(1, len(static_mappings))
1362 self.assertEqual(static_mappings[0].external_ip_address[0:4],
1363 self.pg7.local_ip4n)
1364 self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1366 # remove interface address and check static mappings
1367 self.pg7.unconfig_ip4()
1368 static_mappings = self.vapi.snat_static_mapping_dump()
1369 self.assertEqual(0, len(static_mappings))
1371 def test_ipfix_nat44_sess(self):
1372 """ S-NAT IPFIX logging NAT44 session created/delted """
1373 self.ipfix_domain_id = 10
1374 self.ipfix_src_port = 20202
1375 colector_port = 30303
1376 bind_layers(UDP, IPFIX, dport=30303)
1377 self.snat_add_address(self.snat_addr)
1378 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1379 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1381 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1382 src_address=self.pg3.local_ip4n,
1384 template_interval=10,
1385 collector_port=colector_port)
1386 self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
1387 src_port=self.ipfix_src_port)
1389 pkts = self.create_stream_in(self.pg0, self.pg1)
1390 self.pg0.add_stream(pkts)
1391 self.pg_enable_capture(self.pg_interfaces)
1393 capture = self.pg1.get_capture(len(pkts))
1394 self.verify_capture_out(capture)
1395 self.snat_add_address(self.snat_addr, is_add=0)
1396 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1397 capture = self.pg3.get_capture(3)
1398 ipfix = IPFIXDecoder()
1399 # first load template
1401 self.assertTrue(p.haslayer(IPFIX))
1402 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1403 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1404 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1405 self.assertEqual(p[UDP].dport, colector_port)
1406 self.assertEqual(p[IPFIX].observationDomainID,
1407 self.ipfix_domain_id)
1408 if p.haslayer(Template):
1409 ipfix.add_template(p.getlayer(Template))
1410 # verify events in data set
1412 if p.haslayer(Data):
1413 data = ipfix.decode_data_set(p.getlayer(Set))
1414 self.verify_ipfix_nat44_ses(data)
1416 def test_ipfix_addr_exhausted(self):
1417 """ S-NAT IPFIX logging NAT addresses exhausted """
1418 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1419 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1421 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1422 src_address=self.pg3.local_ip4n,
1424 template_interval=10)
1425 self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
1426 src_port=self.ipfix_src_port)
1428 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1429 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1431 self.pg0.add_stream(p)
1432 self.pg_enable_capture(self.pg_interfaces)
1434 capture = self.pg1.get_capture(0)
1435 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1436 capture = self.pg3.get_capture(3)
1437 ipfix = IPFIXDecoder()
1438 # first load template
1440 self.assertTrue(p.haslayer(IPFIX))
1441 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1442 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1443 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1444 self.assertEqual(p[UDP].dport, 4739)
1445 self.assertEqual(p[IPFIX].observationDomainID,
1446 self.ipfix_domain_id)
1447 if p.haslayer(Template):
1448 ipfix.add_template(p.getlayer(Template))
1449 # verify events in data set
1451 if p.haslayer(Data):
1452 data = ipfix.decode_data_set(p.getlayer(Set))
1453 self.verify_ipfix_addr_exhausted(data)
1455 def test_pool_addr_fib(self):
1456 """ S-NAT add pool addresses to FIB """
1457 static_addr = '10.0.0.10'
1458 self.snat_add_address(self.snat_addr)
1459 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1460 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1462 self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr)
1465 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1466 ARP(op=ARP.who_has, pdst=self.snat_addr,
1467 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1468 self.pg1.add_stream(p)
1469 self.pg_enable_capture(self.pg_interfaces)
1471 capture = self.pg1.get_capture(1)
1472 self.assertTrue(capture[0].haslayer(ARP))
1473 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1476 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1477 ARP(op=ARP.who_has, pdst=static_addr,
1478 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1479 self.pg1.add_stream(p)
1480 self.pg_enable_capture(self.pg_interfaces)
1482 capture = self.pg1.get_capture(1)
1483 self.assertTrue(capture[0].haslayer(ARP))
1484 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1486 # send ARP to non-SNAT interface
1487 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1488 ARP(op=ARP.who_has, pdst=self.snat_addr,
1489 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1490 self.pg2.add_stream(p)
1491 self.pg_enable_capture(self.pg_interfaces)
1493 capture = self.pg1.get_capture(0)
1495 # remove addresses and verify
1496 self.snat_add_address(self.snat_addr, is_add=0)
1497 self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr,
1500 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1501 ARP(op=ARP.who_has, pdst=self.snat_addr,
1502 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1503 self.pg1.add_stream(p)
1504 self.pg_enable_capture(self.pg_interfaces)
1506 capture = self.pg1.get_capture(0)
1508 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1509 ARP(op=ARP.who_has, pdst=static_addr,
1510 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1511 self.pg1.add_stream(p)
1512 self.pg_enable_capture(self.pg_interfaces)
1514 capture = self.pg1.get_capture(0)
1516 def test_vrf_mode(self):
1517 """ S-NAT tenant VRF aware address pool mode """
1521 nat_ip1 = "10.0.0.10"
1522 nat_ip2 = "10.0.0.11"
1524 self.pg0.unconfig_ip4()
1525 self.pg1.unconfig_ip4()
1526 self.pg0.set_table_ip4(vrf_id1)
1527 self.pg1.set_table_ip4(vrf_id2)
1528 self.pg0.config_ip4()
1529 self.pg1.config_ip4()
1531 self.snat_add_address(nat_ip1, vrf_id=vrf_id1)
1532 self.snat_add_address(nat_ip2, vrf_id=vrf_id2)
1533 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1534 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1535 self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1539 pkts = self.create_stream_in(self.pg0, self.pg2)
1540 self.pg0.add_stream(pkts)
1541 self.pg_enable_capture(self.pg_interfaces)
1543 capture = self.pg2.get_capture(len(pkts))
1544 self.verify_capture_out(capture, nat_ip1)
1547 pkts = self.create_stream_in(self.pg1, self.pg2)
1548 self.pg1.add_stream(pkts)
1549 self.pg_enable_capture(self.pg_interfaces)
1551 capture = self.pg2.get_capture(len(pkts))
1552 self.verify_capture_out(capture, nat_ip2)
1554 def test_vrf_feature_independent(self):
1555 """ S-NAT tenant VRF independent address pool mode """
1557 nat_ip1 = "10.0.0.10"
1558 nat_ip2 = "10.0.0.11"
1560 self.snat_add_address(nat_ip1)
1561 self.snat_add_address(nat_ip2)
1562 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1563 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1564 self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1568 pkts = self.create_stream_in(self.pg0, self.pg2)
1569 self.pg0.add_stream(pkts)
1570 self.pg_enable_capture(self.pg_interfaces)
1572 capture = self.pg2.get_capture(len(pkts))
1573 self.verify_capture_out(capture, nat_ip1)
1576 pkts = self.create_stream_in(self.pg1, self.pg2)
1577 self.pg1.add_stream(pkts)
1578 self.pg_enable_capture(self.pg_interfaces)
1580 capture = self.pg2.get_capture(len(pkts))
1581 self.verify_capture_out(capture, nat_ip1)
1583 def test_dynamic_ipless_interfaces(self):
1584 """ SNAT interfaces without configured ip dynamic map """
1586 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1587 self.pg7.remote_mac,
1588 self.pg7.remote_ip4n,
1590 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1591 self.pg8.remote_mac,
1592 self.pg8.remote_ip4n,
1595 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1596 dst_address_length=32,
1597 next_hop_address=self.pg7.remote_ip4n,
1598 next_hop_sw_if_index=self.pg7.sw_if_index)
1599 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1600 dst_address_length=32,
1601 next_hop_address=self.pg8.remote_ip4n,
1602 next_hop_sw_if_index=self.pg8.sw_if_index)
1604 self.snat_add_address(self.snat_addr)
1605 self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1606 self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1610 pkts = self.create_stream_in(self.pg7, self.pg8)
1611 self.pg7.add_stream(pkts)
1612 self.pg_enable_capture(self.pg_interfaces)
1614 capture = self.pg8.get_capture(len(pkts))
1615 self.verify_capture_out(capture)
1618 pkts = self.create_stream_out(self.pg8, self.snat_addr)
1619 self.pg8.add_stream(pkts)
1620 self.pg_enable_capture(self.pg_interfaces)
1622 capture = self.pg7.get_capture(len(pkts))
1623 self.verify_capture_in(capture, self.pg7)
1625 def test_static_ipless_interfaces(self):
1626 """ SNAT 1:1 NAT interfaces without configured ip """
1628 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1629 self.pg7.remote_mac,
1630 self.pg7.remote_ip4n,
1632 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1633 self.pg8.remote_mac,
1634 self.pg8.remote_ip4n,
1637 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1638 dst_address_length=32,
1639 next_hop_address=self.pg7.remote_ip4n,
1640 next_hop_sw_if_index=self.pg7.sw_if_index)
1641 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1642 dst_address_length=32,
1643 next_hop_address=self.pg8.remote_ip4n,
1644 next_hop_sw_if_index=self.pg8.sw_if_index)
1646 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr)
1647 self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1648 self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1652 pkts = self.create_stream_out(self.pg8)
1653 self.pg8.add_stream(pkts)
1654 self.pg_enable_capture(self.pg_interfaces)
1656 capture = self.pg7.get_capture(len(pkts))
1657 self.verify_capture_in(capture, self.pg7)
1660 pkts = self.create_stream_in(self.pg7, self.pg8)
1661 self.pg7.add_stream(pkts)
1662 self.pg_enable_capture(self.pg_interfaces)
1664 capture = self.pg8.get_capture(len(pkts))
1665 self.verify_capture_out(capture, self.snat_addr, True)
1667 def test_static_with_port_ipless_interfaces(self):
1668 """ SNAT 1:1 NAT with port interfaces without configured ip """
1670 self.tcp_port_out = 30606
1671 self.udp_port_out = 30607
1672 self.icmp_id_out = 30608
1674 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1675 self.pg7.remote_mac,
1676 self.pg7.remote_ip4n,
1678 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1679 self.pg8.remote_mac,
1680 self.pg8.remote_ip4n,
1683 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1684 dst_address_length=32,
1685 next_hop_address=self.pg7.remote_ip4n,
1686 next_hop_sw_if_index=self.pg7.sw_if_index)
1687 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1688 dst_address_length=32,
1689 next_hop_address=self.pg8.remote_ip4n,
1690 next_hop_sw_if_index=self.pg8.sw_if_index)
1692 self.snat_add_address(self.snat_addr)
1693 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1694 self.tcp_port_in, self.tcp_port_out,
1695 proto=IP_PROTOS.tcp)
1696 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1697 self.udp_port_in, self.udp_port_out,
1698 proto=IP_PROTOS.udp)
1699 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1700 self.icmp_id_in, self.icmp_id_out,
1701 proto=IP_PROTOS.icmp)
1702 self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1703 self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1707 pkts = self.create_stream_out(self.pg8)
1708 self.pg8.add_stream(pkts)
1709 self.pg_enable_capture(self.pg_interfaces)
1711 capture = self.pg7.get_capture(len(pkts))
1712 self.verify_capture_in(capture, self.pg7)
1715 pkts = self.create_stream_in(self.pg7, self.pg8)
1716 self.pg7.add_stream(pkts)
1717 self.pg_enable_capture(self.pg_interfaces)
1719 capture = self.pg8.get_capture(len(pkts))
1720 self.verify_capture_out(capture)
1723 super(TestSNAT, self).tearDown()
1724 if not self.vpp_dead:
1725 self.logger.info(self.vapi.cli("show snat verbose"))
1729 class TestDeterministicNAT(MethodHolder):
1730 """ Deterministic NAT Test Cases """
1733 def setUpConstants(cls):
1734 super(TestDeterministicNAT, cls).setUpConstants()
1735 cls.vpp_cmdline.extend(["snat", "{", "deterministic", "}"])
1738 def setUpClass(cls):
1739 super(TestDeterministicNAT, cls).setUpClass()
1742 cls.tcp_port_in = 6303
1743 cls.tcp_external_port = 6303
1744 cls.udp_port_in = 6304
1745 cls.udp_external_port = 6304
1746 cls.icmp_id_in = 6305
1747 cls.snat_addr = '10.0.0.3'
1749 cls.create_pg_interfaces(range(3))
1750 cls.interfaces = list(cls.pg_interfaces)
1752 for i in cls.interfaces:
1757 cls.pg0.generate_remote_hosts(2)
1758 cls.pg0.configure_ipv4_neighbors()
1761 super(TestDeterministicNAT, cls).tearDownClass()
1764 def create_stream_in(self, in_if, out_if, ttl=64):
1766 Create packet stream for inside network
1768 :param in_if: Inside interface
1769 :param out_if: Outside interface
1770 :param ttl: TTL of generated packets
1774 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1775 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1776 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
1780 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1781 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1782 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
1786 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1787 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1788 ICMP(id=self.icmp_id_in, type='echo-request'))
1793 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
1795 Create packet stream for outside network
1797 :param out_if: Outside interface
1798 :param dst_ip: Destination IP address (Default use global SNAT address)
1799 :param ttl: TTL of generated packets
1802 dst_ip = self.snat_addr
1805 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1806 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1807 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
1811 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1812 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1813 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
1817 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1818 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1819 ICMP(id=self.icmp_external_id, type='echo-reply'))
1824 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
1826 Verify captured packets on outside network
1828 :param capture: Captured packets
1829 :param nat_ip: Translated IP address (Default use global SNAT address)
1830 :param same_port: Sorce port number is not translated (Default False)
1831 :param packet_num: Expected number of packets (Default 3)
1834 nat_ip = self.snat_addr
1835 self.assertEqual(packet_num, len(capture))
1836 for packet in capture:
1838 self.assertEqual(packet[IP].src, nat_ip)
1839 if packet.haslayer(TCP):
1840 self.tcp_port_out = packet[TCP].sport
1841 elif packet.haslayer(UDP):
1842 self.udp_port_out = packet[UDP].sport
1844 self.icmp_external_id = packet[ICMP].id
1846 self.logger.error(ppp("Unexpected or invalid packet "
1847 "(outside network):", packet))
1850 def initiate_tcp_session(self, in_if, out_if):
1852 Initiates TCP session
1854 :param in_if: Inside interface
1855 :param out_if: Outside interface
1858 # SYN packet in->out
1859 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
1860 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
1861 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
1864 self.pg_enable_capture(self.pg_interfaces)
1866 capture = out_if.get_capture(1)
1868 self.tcp_port_out = p[TCP].sport
1870 # SYN + ACK packet out->in
1871 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
1872 IP(src=out_if.remote_ip4, dst=self.snat_addr) /
1873 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
1875 out_if.add_stream(p)
1876 self.pg_enable_capture(self.pg_interfaces)
1878 in_if.get_capture(1)
1880 # ACK packet in->out
1881 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
1882 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
1883 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
1886 self.pg_enable_capture(self.pg_interfaces)
1888 out_if.get_capture(1)
1891 self.logger.error("TCP 3 way handshake failed")
1894 def verify_ipfix_max_entries_per_user(self, data):
1896 Verify IPFIX maximum entries per user exceeded event
1898 :param data: Decoded IPFIX data records
1900 self.assertEqual(1, len(data))
1903 self.assertEqual(ord(record[230]), 13)
1904 # natQuotaExceededEvent
1905 self.assertEqual('\x03\x00\x00\x00', record[466])
1907 self.assertEqual(self.pg0.remote_ip4n, record[8])
1909 def test_deterministic_mode(self):
1910 """ S-NAT run deterministic mode """
1911 in_addr = '172.16.255.0'
1912 out_addr = '172.17.255.50'
1913 in_addr_t = '172.16.255.20'
1914 in_addr_n = socket.inet_aton(in_addr)
1915 out_addr_n = socket.inet_aton(out_addr)
1916 in_addr_t_n = socket.inet_aton(in_addr_t)
1920 snat_config = self.vapi.snat_show_config()
1921 self.assertEqual(1, snat_config.deterministic)
1923 self.vapi.snat_add_det_map(in_addr_n, in_plen, out_addr_n, out_plen)
1925 rep1 = self.vapi.snat_det_forward(in_addr_t_n)
1926 self.assertEqual(rep1.out_addr[:4], out_addr_n)
1927 rep2 = self.vapi.snat_det_reverse(out_addr_n, rep1.out_port_hi)
1928 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
1930 deterministic_mappings = self.vapi.snat_det_map_dump()
1931 self.assertEqual(len(deterministic_mappings), 1)
1932 dsm = deterministic_mappings[0]
1933 self.assertEqual(in_addr_n, dsm.in_addr[:4])
1934 self.assertEqual(in_plen, dsm.in_plen)
1935 self.assertEqual(out_addr_n, dsm.out_addr[:4])
1936 self.assertEqual(out_plen, dsm.out_plen)
1939 deterministic_mappings = self.vapi.snat_det_map_dump()
1940 self.assertEqual(len(deterministic_mappings), 0)
1942 def test_set_timeouts(self):
1943 """ Set deterministic NAT timeouts """
1944 timeouts_before = self.vapi.snat_det_get_timeouts()
1946 self.vapi.snat_det_set_timeouts(timeouts_before.udp + 10,
1947 timeouts_before.tcp_established + 10,
1948 timeouts_before.tcp_transitory + 10,
1949 timeouts_before.icmp + 10)
1951 timeouts_after = self.vapi.snat_det_get_timeouts()
1953 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
1954 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
1955 self.assertNotEqual(timeouts_before.tcp_established,
1956 timeouts_after.tcp_established)
1957 self.assertNotEqual(timeouts_before.tcp_transitory,
1958 timeouts_after.tcp_transitory)
1960 def test_det_in(self):
1961 """ CGNAT translation test (TCP, UDP, ICMP) """
1963 nat_ip = "10.0.0.10"
1965 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
1967 socket.inet_aton(nat_ip),
1969 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1970 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1974 pkts = self.create_stream_in(self.pg0, self.pg1)
1975 self.pg0.add_stream(pkts)
1976 self.pg_enable_capture(self.pg_interfaces)
1978 capture = self.pg1.get_capture(len(pkts))
1979 self.verify_capture_out(capture, nat_ip)
1982 pkts = self.create_stream_out(self.pg1, nat_ip)
1983 self.pg1.add_stream(pkts)
1984 self.pg_enable_capture(self.pg_interfaces)
1986 capture = self.pg0.get_capture(len(pkts))
1987 self.verify_capture_in(capture, self.pg0)
1990 sessions = self.vapi.snat_det_session_dump(self.pg0.remote_ip4n)
1991 self.assertEqual(len(sessions), 3)
1995 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
1996 self.assertEqual(s.in_port, self.tcp_port_in)
1997 self.assertEqual(s.out_port, self.tcp_port_out)
1998 self.assertEqual(s.ext_port, self.tcp_external_port)
2002 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2003 self.assertEqual(s.in_port, self.udp_port_in)
2004 self.assertEqual(s.out_port, self.udp_port_out)
2005 self.assertEqual(s.ext_port, self.udp_external_port)
2009 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2010 self.assertEqual(s.in_port, self.icmp_id_in)
2011 self.assertEqual(s.out_port, self.icmp_external_id)
2013 def test_multiple_users(self):
2014 """ CGNAT multiple users """
2016 nat_ip = "10.0.0.10"
2018 external_port = 6303
2020 host0 = self.pg0.remote_hosts[0]
2021 host1 = self.pg0.remote_hosts[1]
2023 self.vapi.snat_add_det_map(host0.ip4n,
2025 socket.inet_aton(nat_ip),
2027 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2028 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2032 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
2033 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
2034 TCP(sport=port_in, dport=external_port))
2035 self.pg0.add_stream(p)
2036 self.pg_enable_capture(self.pg_interfaces)
2038 capture = self.pg1.get_capture(1)
2043 self.assertEqual(ip.src, nat_ip)
2044 self.assertEqual(ip.dst, self.pg1.remote_ip4)
2045 self.assertEqual(tcp.dport, external_port)
2046 port_out0 = tcp.sport
2048 self.logger.error(ppp("Unexpected or invalid packet:", p))
2052 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
2053 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
2054 TCP(sport=port_in, dport=external_port))
2055 self.pg0.add_stream(p)
2056 self.pg_enable_capture(self.pg_interfaces)
2058 capture = self.pg1.get_capture(1)
2063 self.assertEqual(ip.src, nat_ip)
2064 self.assertEqual(ip.dst, self.pg1.remote_ip4)
2065 self.assertEqual(tcp.dport, external_port)
2066 port_out1 = tcp.sport
2068 self.logger.error(ppp("Unexpected or invalid packet:", p))
2071 dms = self.vapi.snat_det_map_dump()
2072 self.assertEqual(1, len(dms))
2073 self.assertEqual(2, dms[0].ses_num)
2076 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2077 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2078 TCP(sport=external_port, dport=port_out0))
2079 self.pg1.add_stream(p)
2080 self.pg_enable_capture(self.pg_interfaces)
2082 capture = self.pg0.get_capture(1)
2087 self.assertEqual(ip.src, self.pg1.remote_ip4)
2088 self.assertEqual(ip.dst, host0.ip4)
2089 self.assertEqual(tcp.dport, port_in)
2090 self.assertEqual(tcp.sport, external_port)
2092 self.logger.error(ppp("Unexpected or invalid packet:", p))
2096 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2097 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2098 TCP(sport=external_port, dport=port_out1))
2099 self.pg1.add_stream(p)
2100 self.pg_enable_capture(self.pg_interfaces)
2102 capture = self.pg0.get_capture(1)
2107 self.assertEqual(ip.src, self.pg1.remote_ip4)
2108 self.assertEqual(ip.dst, host1.ip4)
2109 self.assertEqual(tcp.dport, port_in)
2110 self.assertEqual(tcp.sport, external_port)
2112 self.logger.error(ppp("Unexpected or invalid packet", p))
2115 # session close api test
2116 self.vapi.snat_det_close_session_out(socket.inet_aton(nat_ip),
2118 self.pg1.remote_ip4n,
2120 dms = self.vapi.snat_det_map_dump()
2121 self.assertEqual(dms[0].ses_num, 1)
2123 self.vapi.snat_det_close_session_in(host0.ip4n,
2125 self.pg1.remote_ip4n,
2127 dms = self.vapi.snat_det_map_dump()
2128 self.assertEqual(dms[0].ses_num, 0)
2130 def test_tcp_session_close_detection_in(self):
2131 """ CGNAT TCP session close initiated from inside network """
2132 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2134 socket.inet_aton(self.snat_addr),
2136 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2137 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2140 self.initiate_tcp_session(self.pg0, self.pg1)
2142 # close the session from inside
2144 # FIN packet in -> out
2145 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2146 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2147 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2149 self.pg0.add_stream(p)
2150 self.pg_enable_capture(self.pg_interfaces)
2152 self.pg1.get_capture(1)
2156 # ACK packet out -> in
2157 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2158 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2159 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2163 # FIN packet out -> in
2164 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2165 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2166 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2170 self.pg1.add_stream(pkts)
2171 self.pg_enable_capture(self.pg_interfaces)
2173 self.pg0.get_capture(2)
2175 # ACK packet in -> out
2176 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2177 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2178 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2180 self.pg0.add_stream(p)
2181 self.pg_enable_capture(self.pg_interfaces)
2183 self.pg1.get_capture(1)
2185 # Check if snat closed the session
2186 dms = self.vapi.snat_det_map_dump()
2187 self.assertEqual(0, dms[0].ses_num)
2189 self.logger.error("TCP session termination failed")
2192 def test_tcp_session_close_detection_out(self):
2193 """ CGNAT TCP session close initiated from outside network """
2194 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2196 socket.inet_aton(self.snat_addr),
2198 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2199 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2202 self.initiate_tcp_session(self.pg0, self.pg1)
2204 # close the session from outside
2206 # FIN packet out -> in
2207 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2208 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2209 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2211 self.pg1.add_stream(p)
2212 self.pg_enable_capture(self.pg_interfaces)
2214 self.pg0.get_capture(1)
2218 # ACK packet in -> out
2219 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2220 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2221 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2225 # ACK packet in -> out
2226 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2227 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2228 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2232 self.pg0.add_stream(pkts)
2233 self.pg_enable_capture(self.pg_interfaces)
2235 self.pg1.get_capture(2)
2237 # ACK packet out -> in
2238 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2239 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2240 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2242 self.pg1.add_stream(p)
2243 self.pg_enable_capture(self.pg_interfaces)
2245 self.pg0.get_capture(1)
2247 # Check if snat closed the session
2248 dms = self.vapi.snat_det_map_dump()
2249 self.assertEqual(0, dms[0].ses_num)
2251 self.logger.error("TCP session termination failed")
2254 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2255 def test_session_timeout(self):
2256 """ CGNAT session timeouts """
2257 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2259 socket.inet_aton(self.snat_addr),
2261 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2262 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2265 self.initiate_tcp_session(self.pg0, self.pg1)
2266 self.vapi.snat_det_set_timeouts(5, 5, 5, 5)
2267 pkts = self.create_stream_in(self.pg0, self.pg1)
2268 self.pg0.add_stream(pkts)
2269 self.pg_enable_capture(self.pg_interfaces)
2271 capture = self.pg1.get_capture(len(pkts))
2274 dms = self.vapi.snat_det_map_dump()
2275 self.assertEqual(0, dms[0].ses_num)
2277 def test_session_limit_per_user(self):
2278 """ CGNAT maximum 1000 sessions per user should be created """
2279 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2281 socket.inet_aton(self.snat_addr),
2283 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2284 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2286 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
2287 src_address=self.pg2.local_ip4n,
2289 template_interval=10)
2290 self.vapi.snat_ipfix()
2293 for port in range(1025, 2025):
2294 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2295 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2296 UDP(sport=port, dport=port))
2299 self.pg0.add_stream(pkts)
2300 self.pg_enable_capture(self.pg_interfaces)
2302 capture = self.pg1.get_capture(len(pkts))
2304 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2305 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2306 UDP(sport=3001, dport=3002))
2307 self.pg0.add_stream(p)
2308 self.pg_enable_capture(self.pg_interfaces)
2310 capture = self.pg1.assert_nothing_captured()
2312 # verify ICMP error packet
2313 capture = self.pg0.get_capture(1)
2315 self.assertTrue(p.haslayer(ICMP))
2317 self.assertEqual(icmp.type, 3)
2318 self.assertEqual(icmp.code, 1)
2319 self.assertTrue(icmp.haslayer(IPerror))
2320 inner_ip = icmp[IPerror]
2321 self.assertEqual(inner_ip[UDPerror].sport, 3001)
2322 self.assertEqual(inner_ip[UDPerror].dport, 3002)
2324 dms = self.vapi.snat_det_map_dump()
2326 self.assertEqual(1000, dms[0].ses_num)
2328 # verify IPFIX logging
2329 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2330 capture = self.pg2.get_capture(2)
2331 ipfix = IPFIXDecoder()
2332 # first load template
2334 self.assertTrue(p.haslayer(IPFIX))
2335 if p.haslayer(Template):
2336 ipfix.add_template(p.getlayer(Template))
2337 # verify events in data set
2339 if p.haslayer(Data):
2340 data = ipfix.decode_data_set(p.getlayer(Set))
2341 self.verify_ipfix_max_entries_per_user(data)
2343 def clear_snat(self):
2345 Clear SNAT configuration.
2347 self.vapi.snat_ipfix(enable=0)
2348 self.vapi.snat_det_set_timeouts()
2349 deterministic_mappings = self.vapi.snat_det_map_dump()
2350 for dsm in deterministic_mappings:
2351 self.vapi.snat_add_det_map(dsm.in_addr,
2357 interfaces = self.vapi.snat_interface_dump()
2358 for intf in interfaces:
2359 self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
2364 super(TestDeterministicNAT, self).tearDown()
2365 if not self.vpp_dead:
2366 self.logger.info(self.vapi.cli("show snat detail"))
2370 class TestNAT64(MethodHolder):
2371 """ NAT64 Test Cases """
2374 def setUpClass(cls):
2375 super(TestNAT64, cls).setUpClass()
2378 cls.tcp_port_in = 6303
2379 cls.tcp_port_out = 6303
2380 cls.udp_port_in = 6304
2381 cls.udp_port_out = 6304
2382 cls.icmp_id_in = 6305
2383 cls.icmp_id_out = 6305
2384 cls.nat_addr = '10.0.0.3'
2385 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
2387 cls.create_pg_interfaces(range(2))
2388 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
2389 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
2391 for i in cls.ip6_interfaces:
2396 for i in cls.ip4_interfaces:
2402 super(TestNAT64, cls).tearDownClass()
2405 def test_pool(self):
2406 """ Add/delete address to NAT64 pool """
2407 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
2409 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
2411 addresses = self.vapi.nat64_pool_addr_dump()
2412 self.assertEqual(len(addresses), 1)
2413 self.assertEqual(addresses[0].address, nat_addr)
2415 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
2417 addresses = self.vapi.nat64_pool_addr_dump()
2418 self.assertEqual(len(addresses), 0)
2420 def test_interface(self):
2421 """ Enable/disable NAT64 feature on the interface """
2422 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2423 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2425 interfaces = self.vapi.nat64_interface_dump()
2426 self.assertEqual(len(interfaces), 2)
2429 for intf in interfaces:
2430 if intf.sw_if_index == self.pg0.sw_if_index:
2431 self.assertEqual(intf.is_inside, 1)
2433 elif intf.sw_if_index == self.pg1.sw_if_index:
2434 self.assertEqual(intf.is_inside, 0)
2436 self.assertTrue(pg0_found)
2437 self.assertTrue(pg1_found)
2439 features = self.vapi.cli("show interface features pg0")
2440 self.assertNotEqual(features.find('nat64-in2out'), -1)
2441 features = self.vapi.cli("show interface features pg1")
2442 self.assertNotEqual(features.find('nat64-out2in'), -1)
2444 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
2445 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
2447 interfaces = self.vapi.nat64_interface_dump()
2448 self.assertEqual(len(interfaces), 0)
2450 def test_static_bib(self):
2451 """ Add/delete static BIB entry """
2452 in_addr = socket.inet_pton(socket.AF_INET6,
2453 '2001:db8:85a3::8a2e:370:7334')
2454 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
2457 proto = IP_PROTOS.tcp
2459 self.vapi.nat64_add_del_static_bib(in_addr,
2464 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
2469 self.assertEqual(bibe.i_addr, in_addr)
2470 self.assertEqual(bibe.o_addr, out_addr)
2471 self.assertEqual(bibe.i_port, in_port)
2472 self.assertEqual(bibe.o_port, out_port)
2473 self.assertEqual(static_bib_num, 1)
2475 self.vapi.nat64_add_del_static_bib(in_addr,
2481 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
2486 self.assertEqual(static_bib_num, 0)
2488 def test_set_timeouts(self):
2489 """ Set NAT64 timeouts """
2490 # verify default values
2491 timeouts = self.vapi.nat64_get_timeouts()
2492 self.assertEqual(timeouts.udp, 300)
2493 self.assertEqual(timeouts.icmp, 60)
2494 self.assertEqual(timeouts.tcp_trans, 240)
2495 self.assertEqual(timeouts.tcp_est, 7440)
2496 self.assertEqual(timeouts.tcp_incoming_syn, 6)
2498 # set and verify custom values
2499 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
2500 tcp_est=7450, tcp_incoming_syn=10)
2501 timeouts = self.vapi.nat64_get_timeouts()
2502 self.assertEqual(timeouts.udp, 200)
2503 self.assertEqual(timeouts.icmp, 30)
2504 self.assertEqual(timeouts.tcp_trans, 250)
2505 self.assertEqual(timeouts.tcp_est, 7450)
2506 self.assertEqual(timeouts.tcp_incoming_syn, 10)
2508 def test_dynamic(self):
2509 """ NAT64 dynamic translation test """
2510 self.tcp_port_in = 6303
2511 self.udp_port_in = 6304
2512 self.icmp_id_in = 6305
2514 ses_num_start = self.nat64_get_ses_num()
2516 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2518 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2519 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2522 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2523 self.pg0.add_stream(pkts)
2524 self.pg_enable_capture(self.pg_interfaces)
2526 capture = self.pg1.get_capture(3)
2527 self.verify_capture_out(capture, packet_num=3, nat_ip=self.nat_addr,
2528 dst_ip=self.pg1.remote_ip4)
2531 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2532 self.pg1.add_stream(pkts)
2533 self.pg_enable_capture(self.pg_interfaces)
2535 capture = self.pg0.get_capture(3)
2536 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2537 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2540 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2541 self.pg0.add_stream(pkts)
2542 self.pg_enable_capture(self.pg_interfaces)
2544 capture = self.pg1.get_capture(3)
2545 self.verify_capture_out(capture, packet_num=3, nat_ip=self.nat_addr,
2546 dst_ip=self.pg1.remote_ip4)
2549 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2550 self.pg1.add_stream(pkts)
2551 self.pg_enable_capture(self.pg_interfaces)
2553 capture = self.pg0.get_capture(3)
2554 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2555 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2557 ses_num_end = self.nat64_get_ses_num()
2559 self.assertEqual(ses_num_end - ses_num_start, 3)
2561 def test_static(self):
2562 """ NAT64 static translation test """
2563 self.tcp_port_in = 60303
2564 self.udp_port_in = 60304
2565 self.icmp_id_in = 60305
2566 self.tcp_port_out = 60303
2567 self.udp_port_out = 60304
2568 self.icmp_id_out = 60305
2570 ses_num_start = self.nat64_get_ses_num()
2572 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2574 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2575 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2577 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2582 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2587 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2594 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2595 self.pg0.add_stream(pkts)
2596 self.pg_enable_capture(self.pg_interfaces)
2598 capture = self.pg1.get_capture(3)
2599 self.verify_capture_out(capture, packet_num=3, nat_ip=self.nat_addr,
2600 dst_ip=self.pg1.remote_ip4, same_port=True)
2603 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2604 self.pg1.add_stream(pkts)
2605 self.pg_enable_capture(self.pg_interfaces)
2607 capture = self.pg0.get_capture(3)
2608 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2609 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2611 ses_num_end = self.nat64_get_ses_num()
2613 self.assertEqual(ses_num_end - ses_num_start, 3)
2615 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2616 def test_session_timeout(self):
2617 """ NAT64 session timeout """
2618 self.icmp_id_in = 1234
2619 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2621 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2622 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2623 self.vapi.nat64_set_timeouts(icmp=5)
2625 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2626 self.pg0.add_stream(pkts)
2627 self.pg_enable_capture(self.pg_interfaces)
2629 capture = self.pg1.get_capture(3)
2631 ses_num_before_timeout = self.nat64_get_ses_num()
2635 # ICMP session after timeout
2636 ses_num_after_timeout = self.nat64_get_ses_num()
2637 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
2639 def test_icmp_error(self):
2640 """ NAT64 ICMP Error message translation """
2641 self.tcp_port_in = 6303
2642 self.udp_port_in = 6304
2643 self.icmp_id_in = 6305
2645 ses_num_start = self.nat64_get_ses_num()
2647 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2649 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2650 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2652 # send some packets to create sessions
2653 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2654 self.pg0.add_stream(pkts)
2655 self.pg_enable_capture(self.pg_interfaces)
2657 capture_ip4 = self.pg1.get_capture(len(pkts))
2658 self.verify_capture_out(capture_ip4, packet_num=3,
2659 nat_ip=self.nat_addr,
2660 dst_ip=self.pg1.remote_ip4)
2662 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2663 self.pg1.add_stream(pkts)
2664 self.pg_enable_capture(self.pg_interfaces)
2666 capture_ip6 = self.pg0.get_capture(len(pkts))
2667 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2668 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
2669 self.pg0.remote_ip6)
2672 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2673 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
2674 ICMPv6DestUnreach(code=1) /
2675 packet[IPv6] for packet in capture_ip6]
2676 self.pg0.add_stream(pkts)
2677 self.pg_enable_capture(self.pg_interfaces)
2679 capture = self.pg1.get_capture(len(pkts))
2680 for packet in capture:
2682 self.assertEqual(packet[IP].src, self.nat_addr)
2683 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2684 self.assertEqual(packet[ICMP].type, 3)
2685 self.assertEqual(packet[ICMP].code, 13)
2686 inner = packet[IPerror]
2687 self.assertEqual(inner.src, self.pg1.remote_ip4)
2688 self.assertEqual(inner.dst, self.nat_addr)
2689 if inner.haslayer(TCPerror):
2690 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
2691 elif inner.haslayer(UDPerror):
2692 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
2694 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
2696 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2700 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2701 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2702 ICMP(type=3, code=13) /
2703 packet[IP] for packet in capture_ip4]
2704 self.pg1.add_stream(pkts)
2705 self.pg_enable_capture(self.pg_interfaces)
2707 capture = self.pg0.get_capture(len(pkts))
2708 for packet in capture:
2710 self.assertEqual(packet[IPv6].src, ip.src)
2711 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
2712 icmp = packet[ICMPv6DestUnreach]
2713 self.assertEqual(icmp.code, 1)
2714 inner = icmp[IPerror6]
2715 self.assertEqual(inner.src, self.pg0.remote_ip6)
2716 self.assertEqual(inner.dst, ip.src)
2717 if inner.haslayer(TCPerror):
2718 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
2719 elif inner.haslayer(UDPerror):
2720 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
2722 self.assertEqual(inner[ICMPv6EchoRequest].id,
2725 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2728 def nat64_get_ses_num(self):
2730 Return number of active NAT64 sessions.
2733 st = self.vapi.nat64_st_dump(IP_PROTOS.tcp)
2735 st = self.vapi.nat64_st_dump(IP_PROTOS.udp)
2737 st = self.vapi.nat64_st_dump(IP_PROTOS.icmp)
2741 def clear_nat64(self):
2743 Clear NAT64 configuration.
2745 self.vapi.nat64_set_timeouts()
2747 interfaces = self.vapi.nat64_interface_dump()
2748 for intf in interfaces:
2749 self.vapi.nat64_add_del_interface(intf.sw_if_index,
2753 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
2756 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
2764 bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
2767 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
2775 bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
2778 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
2786 adresses = self.vapi.nat64_pool_addr_dump()
2787 for addr in adresses:
2788 self.vapi.nat64_add_del_pool_addr_range(addr.address,
2793 super(TestNAT64, self).tearDown()
2794 if not self.vpp_dead:
2795 self.logger.info(self.vapi.cli("show nat64 pool"))
2796 self.logger.info(self.vapi.cli("show nat64 interfaces"))
2797 self.logger.info(self.vapi.cli("show nat64 bib tcp"))
2798 self.logger.info(self.vapi.cli("show nat64 bib udp"))
2799 self.logger.info(self.vapi.cli("show nat64 bib icmp"))
2800 self.logger.info(self.vapi.cli("show nat64 session table tcp"))
2801 self.logger.info(self.vapi.cli("show nat64 session table udp"))
2802 self.logger.info(self.vapi.cli("show nat64 session table icmp"))
2805 if __name__ == '__main__':
2806 unittest.main(testRunner=VppTestRunner)