7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
10 from scapy.layers.l2 import Ether, ARP
11 from scapy.data import IP_PROTOS
13 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
14 from time import sleep
17 class MethodHolder(VppTestCase):
18 """ SNAT create capture and verify method holder """
22 super(MethodHolder, cls).setUpClass()
25 super(MethodHolder, self).tearDown()
27 def create_stream_in(self, in_if, out_if, ttl=64):
29 Create packet stream for inside network
31 :param in_if: Inside interface
32 :param out_if: Outside interface
33 :param ttl: TTL of generated packets
37 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
38 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
39 TCP(sport=self.tcp_port_in))
43 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
44 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
45 UDP(sport=self.udp_port_in))
49 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
50 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
51 ICMP(id=self.icmp_id_in, type='echo-request'))
56 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
58 Create packet stream for outside network
60 :param out_if: Outside interface
61 :param dst_ip: Destination IP address (Default use global SNAT address)
62 :param ttl: TTL of generated packets
65 dst_ip = self.snat_addr
68 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
69 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
70 TCP(dport=self.tcp_port_out))
74 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
75 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
76 UDP(dport=self.udp_port_out))
80 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
81 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
82 ICMP(id=self.icmp_id_out, type='echo-reply'))
87 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
90 Verify captured packets on outside network
92 :param capture: Captured packets
93 :param nat_ip: Translated IP address (Default use global SNAT address)
94 :param same_port: Sorce port number is not translated (Default False)
95 :param packet_num: Expected number of packets (Default 3)
98 nat_ip = self.snat_addr
99 self.assertEqual(packet_num, len(capture))
100 for packet in capture:
102 self.assertEqual(packet[IP].src, nat_ip)
103 if packet.haslayer(TCP):
105 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
108 packet[TCP].sport, self.tcp_port_in)
109 self.tcp_port_out = packet[TCP].sport
110 elif packet.haslayer(UDP):
112 self.assertEqual(packet[UDP].sport, self.udp_port_in)
115 packet[UDP].sport, self.udp_port_in)
116 self.udp_port_out = packet[UDP].sport
119 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
121 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
122 self.icmp_id_out = packet[ICMP].id
124 self.logger.error(ppp("Unexpected or invalid packet "
125 "(outside network):", packet))
128 def verify_capture_in(self, capture, in_if, packet_num=3):
130 Verify captured packets on inside network
132 :param capture: Captured packets
133 :param in_if: Inside interface
134 :param packet_num: Expected number of packets (Default 3)
136 self.assertEqual(packet_num, len(capture))
137 for packet in capture:
139 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
140 if packet.haslayer(TCP):
141 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
142 elif packet.haslayer(UDP):
143 self.assertEqual(packet[UDP].dport, self.udp_port_in)
145 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
147 self.logger.error(ppp("Unexpected or invalid packet "
148 "(inside network):", packet))
151 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
153 Verify captured packet that don't have to be translated
155 :param capture: Captured packets
156 :param ingress_if: Ingress interface
157 :param egress_if: Egress interface
159 for packet in capture:
161 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
162 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
163 if packet.haslayer(TCP):
164 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
165 elif packet.haslayer(UDP):
166 self.assertEqual(packet[UDP].sport, self.udp_port_in)
168 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
170 self.logger.error(ppp("Unexpected or invalid packet "
171 "(inside network):", packet))
174 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
175 packet_num=3, icmp_type=11):
177 Verify captured packets with ICMP errors on outside network
179 :param capture: Captured packets
180 :param src_ip: Translated IP address or IP address of VPP
181 (Default use global SNAT address)
182 :param packet_num: Expected number of packets (Default 3)
183 :param icmp_type: Type of error ICMP packet
184 we are expecting (Default 11)
187 src_ip = self.snat_addr
188 self.assertEqual(packet_num, len(capture))
189 for packet in capture:
191 self.assertEqual(packet[IP].src, src_ip)
192 self.assertTrue(packet.haslayer(ICMP))
194 self.assertEqual(icmp.type, icmp_type)
195 self.assertTrue(icmp.haslayer(IPerror))
196 inner_ip = icmp[IPerror]
197 if inner_ip.haslayer(TCPerror):
198 self.assertEqual(inner_ip[TCPerror].dport,
200 elif inner_ip.haslayer(UDPerror):
201 self.assertEqual(inner_ip[UDPerror].dport,
204 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
206 self.logger.error(ppp("Unexpected or invalid packet "
207 "(outside network):", packet))
210 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
213 Verify captured packets with ICMP errors on inside network
215 :param capture: Captured packets
216 :param in_if: Inside interface
217 :param packet_num: Expected number of packets (Default 3)
218 :param icmp_type: Type of error ICMP packet
219 we are expecting (Default 11)
221 self.assertEqual(packet_num, len(capture))
222 for packet in capture:
224 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
225 self.assertTrue(packet.haslayer(ICMP))
227 self.assertEqual(icmp.type, icmp_type)
228 self.assertTrue(icmp.haslayer(IPerror))
229 inner_ip = icmp[IPerror]
230 if inner_ip.haslayer(TCPerror):
231 self.assertEqual(inner_ip[TCPerror].sport,
233 elif inner_ip.haslayer(UDPerror):
234 self.assertEqual(inner_ip[UDPerror].sport,
237 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
239 self.logger.error(ppp("Unexpected or invalid packet "
240 "(inside network):", packet))
243 def verify_ipfix_nat44_ses(self, data):
245 Verify IPFIX NAT44 session create/delete event
247 :param data: Decoded IPFIX data records
249 nat44_ses_create_num = 0
250 nat44_ses_delete_num = 0
251 self.assertEqual(6, len(data))
254 self.assertIn(ord(record[230]), [4, 5])
255 if ord(record[230]) == 4:
256 nat44_ses_create_num += 1
258 nat44_ses_delete_num += 1
260 self.assertEqual(self.pg0.remote_ip4n, record[8])
261 # postNATSourceIPv4Address
262 self.assertEqual(socket.inet_pton(socket.AF_INET, self.snat_addr),
265 self.assertEqual(struct.pack("!I", 0), record[234])
266 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
267 if IP_PROTOS.icmp == ord(record[4]):
268 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
269 self.assertEqual(struct.pack("!H", self.icmp_id_out),
271 elif IP_PROTOS.tcp == ord(record[4]):
272 self.assertEqual(struct.pack("!H", self.tcp_port_in),
274 self.assertEqual(struct.pack("!H", self.tcp_port_out),
276 elif IP_PROTOS.udp == ord(record[4]):
277 self.assertEqual(struct.pack("!H", self.udp_port_in),
279 self.assertEqual(struct.pack("!H", self.udp_port_out),
282 self.fail("Invalid protocol")
283 self.assertEqual(3, nat44_ses_create_num)
284 self.assertEqual(3, nat44_ses_delete_num)
286 def verify_ipfix_addr_exhausted(self, data):
288 Verify IPFIX NAT addresses event
290 :param data: Decoded IPFIX data records
292 self.assertEqual(1, len(data))
295 self.assertEqual(ord(record[230]), 3)
297 self.assertEqual(struct.pack("!I", 0), record[283])
300 class TestSNAT(MethodHolder):
301 """ SNAT Test Cases """
305 super(TestSNAT, cls).setUpClass()
308 cls.tcp_port_in = 6303
309 cls.tcp_port_out = 6303
310 cls.udp_port_in = 6304
311 cls.udp_port_out = 6304
312 cls.icmp_id_in = 6305
313 cls.icmp_id_out = 6305
314 cls.snat_addr = '10.0.0.3'
316 cls.create_pg_interfaces(range(9))
317 cls.interfaces = list(cls.pg_interfaces[0:4])
319 for i in cls.interfaces:
324 cls.pg0.generate_remote_hosts(2)
325 cls.pg0.configure_ipv4_neighbors()
327 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
329 cls.pg4._local_ip4 = "172.16.255.1"
330 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
331 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
332 cls.pg4.set_table_ip4(10)
333 cls.pg5._local_ip4 = "172.16.255.3"
334 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
335 cls.pg5._remote_hosts[0]._ip4 = "172.16.255.4"
336 cls.pg5.set_table_ip4(10)
337 cls.pg6._local_ip4 = "172.16.255.1"
338 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
339 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
340 cls.pg6.set_table_ip4(20)
341 for i in cls.overlapping_interfaces:
350 super(TestSNAT, cls).tearDownClass()
353 def clear_snat(self):
355 Clear SNAT configuration.
357 # I found no elegant way to do this
358 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
359 dst_address_length=32,
360 next_hop_address=self.pg7.remote_ip4n,
361 next_hop_sw_if_index=self.pg7.sw_if_index,
363 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
364 dst_address_length=32,
365 next_hop_address=self.pg8.remote_ip4n,
366 next_hop_sw_if_index=self.pg8.sw_if_index,
369 for intf in [self.pg7, self.pg8]:
370 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
372 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
377 if self.pg7.has_ip4_config:
378 self.pg7.unconfig_ip4()
380 interfaces = self.vapi.snat_interface_addr_dump()
381 for intf in interfaces:
382 self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
384 self.vapi.snat_ipfix(enable=0)
386 interfaces = self.vapi.snat_interface_dump()
387 for intf in interfaces:
388 self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
392 static_mappings = self.vapi.snat_static_mapping_dump()
393 for sm in static_mappings:
394 self.vapi.snat_add_static_mapping(sm.local_ip_address,
395 sm.external_ip_address,
396 local_port=sm.local_port,
397 external_port=sm.external_port,
398 addr_only=sm.addr_only,
400 protocol=sm.protocol,
403 adresses = self.vapi.snat_address_dump()
404 for addr in adresses:
405 self.vapi.snat_add_address_range(addr.ip_address,
409 def snat_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
410 local_port=0, external_port=0, vrf_id=0,
411 is_add=1, external_sw_if_index=0xFFFFFFFF,
414 Add/delete S-NAT static mapping
416 :param local_ip: Local IP address
417 :param external_ip: External IP address
418 :param local_port: Local port number (Optional)
419 :param external_port: External port number (Optional)
420 :param vrf_id: VRF ID (Default 0)
421 :param is_add: 1 if add, 0 if delete (Default add)
422 :param external_sw_if_index: External interface instead of IP address
423 :param proto: IP protocol (Mandatory if port specified)
426 if local_port and external_port:
428 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
429 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
430 self.vapi.snat_add_static_mapping(
433 external_sw_if_index,
441 def snat_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
443 Add/delete S-NAT address
445 :param ip: IP address
446 :param is_add: 1 if add, 0 if delete (Default add)
448 snat_addr = socket.inet_pton(socket.AF_INET, ip)
449 self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add,
452 def test_dynamic(self):
453 """ SNAT dynamic translation test """
455 self.snat_add_address(self.snat_addr)
456 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
457 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
461 pkts = self.create_stream_in(self.pg0, self.pg1)
462 self.pg0.add_stream(pkts)
463 self.pg_enable_capture(self.pg_interfaces)
465 capture = self.pg1.get_capture(len(pkts))
466 self.verify_capture_out(capture)
469 pkts = self.create_stream_out(self.pg1)
470 self.pg1.add_stream(pkts)
471 self.pg_enable_capture(self.pg_interfaces)
473 capture = self.pg0.get_capture(len(pkts))
474 self.verify_capture_in(capture, self.pg0)
476 def test_dynamic_icmp_errors_in2out_ttl_1(self):
477 """ SNAT handling of client packets with TTL=1 """
479 self.snat_add_address(self.snat_addr)
480 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
481 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
484 # Client side - generate traffic
485 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
486 self.pg0.add_stream(pkts)
487 self.pg_enable_capture(self.pg_interfaces)
490 # Client side - verify ICMP type 11 packets
491 capture = self.pg0.get_capture(len(pkts))
492 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
494 def test_dynamic_icmp_errors_out2in_ttl_1(self):
495 """ SNAT handling of server packets with TTL=1 """
497 self.snat_add_address(self.snat_addr)
498 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
499 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
502 # Client side - create sessions
503 pkts = self.create_stream_in(self.pg0, self.pg1)
504 self.pg0.add_stream(pkts)
505 self.pg_enable_capture(self.pg_interfaces)
508 # Server side - generate traffic
509 capture = self.pg1.get_capture(len(pkts))
510 self.verify_capture_out(capture)
511 pkts = self.create_stream_out(self.pg1, ttl=1)
512 self.pg1.add_stream(pkts)
513 self.pg_enable_capture(self.pg_interfaces)
516 # Server side - verify ICMP type 11 packets
517 capture = self.pg1.get_capture(len(pkts))
518 self.verify_capture_out_with_icmp_errors(capture,
519 src_ip=self.pg1.local_ip4)
521 def test_dynamic_icmp_errors_in2out_ttl_2(self):
522 """ SNAT handling of error responses to client packets with TTL=2 """
524 self.snat_add_address(self.snat_addr)
525 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
526 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
529 # Client side - generate traffic
530 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
531 self.pg0.add_stream(pkts)
532 self.pg_enable_capture(self.pg_interfaces)
535 # Server side - simulate ICMP type 11 response
536 capture = self.pg1.get_capture(len(pkts))
537 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
538 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
539 ICMP(type=11) / packet[IP] for packet in capture]
540 self.pg1.add_stream(pkts)
541 self.pg_enable_capture(self.pg_interfaces)
544 # Client side - verify ICMP type 11 packets
545 capture = self.pg0.get_capture(len(pkts))
546 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
548 def test_dynamic_icmp_errors_out2in_ttl_2(self):
549 """ SNAT handling of error responses to server packets with TTL=2 """
551 self.snat_add_address(self.snat_addr)
552 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
553 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
556 # Client side - create sessions
557 pkts = self.create_stream_in(self.pg0, self.pg1)
558 self.pg0.add_stream(pkts)
559 self.pg_enable_capture(self.pg_interfaces)
562 # Server side - generate traffic
563 capture = self.pg1.get_capture(len(pkts))
564 self.verify_capture_out(capture)
565 pkts = self.create_stream_out(self.pg1, ttl=2)
566 self.pg1.add_stream(pkts)
567 self.pg_enable_capture(self.pg_interfaces)
570 # Client side - simulate ICMP type 11 response
571 capture = self.pg0.get_capture(len(pkts))
572 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
573 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
574 ICMP(type=11) / packet[IP] for packet in capture]
575 self.pg0.add_stream(pkts)
576 self.pg_enable_capture(self.pg_interfaces)
579 # Server side - verify ICMP type 11 packets
580 capture = self.pg1.get_capture(len(pkts))
581 self.verify_capture_out_with_icmp_errors(capture)
583 def test_ping_out_interface_from_outside(self):
584 """ Ping SNAT out interface from outside network """
586 self.snat_add_address(self.snat_addr)
587 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
588 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
591 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
592 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
593 ICMP(id=self.icmp_id_out, type='echo-request'))
595 self.pg1.add_stream(pkts)
596 self.pg_enable_capture(self.pg_interfaces)
598 capture = self.pg1.get_capture(len(pkts))
599 self.assertEqual(1, len(capture))
602 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
603 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
604 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
605 self.assertEqual(packet[ICMP].type, 0) # echo reply
607 self.logger.error(ppp("Unexpected or invalid packet "
608 "(outside network):", packet))
611 def test_ping_internal_host_from_outside(self):
612 """ Ping internal host from outside network """
614 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr)
615 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
616 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
620 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
621 IP(src=self.pg1.remote_ip4, dst=self.snat_addr, ttl=64) /
622 ICMP(id=self.icmp_id_out, type='echo-request'))
623 self.pg1.add_stream(pkt)
624 self.pg_enable_capture(self.pg_interfaces)
626 capture = self.pg0.get_capture(1)
627 self.verify_capture_in(capture, self.pg0, packet_num=1)
628 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
631 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
632 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
633 ICMP(id=self.icmp_id_in, type='echo-reply'))
634 self.pg0.add_stream(pkt)
635 self.pg_enable_capture(self.pg_interfaces)
637 capture = self.pg1.get_capture(1)
638 self.verify_capture_out(capture, same_port=True, packet_num=1)
639 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
641 def test_static_in(self):
642 """ SNAT 1:1 NAT initialized from inside network """
645 self.tcp_port_out = 6303
646 self.udp_port_out = 6304
647 self.icmp_id_out = 6305
649 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
650 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
651 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
655 pkts = self.create_stream_in(self.pg0, self.pg1)
656 self.pg0.add_stream(pkts)
657 self.pg_enable_capture(self.pg_interfaces)
659 capture = self.pg1.get_capture(len(pkts))
660 self.verify_capture_out(capture, nat_ip, True)
663 pkts = self.create_stream_out(self.pg1, nat_ip)
664 self.pg1.add_stream(pkts)
665 self.pg_enable_capture(self.pg_interfaces)
667 capture = self.pg0.get_capture(len(pkts))
668 self.verify_capture_in(capture, self.pg0)
670 def test_static_out(self):
671 """ SNAT 1:1 NAT initialized from outside network """
674 self.tcp_port_out = 6303
675 self.udp_port_out = 6304
676 self.icmp_id_out = 6305
678 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
679 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
680 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
684 pkts = self.create_stream_out(self.pg1, nat_ip)
685 self.pg1.add_stream(pkts)
686 self.pg_enable_capture(self.pg_interfaces)
688 capture = self.pg0.get_capture(len(pkts))
689 self.verify_capture_in(capture, self.pg0)
692 pkts = self.create_stream_in(self.pg0, self.pg1)
693 self.pg0.add_stream(pkts)
694 self.pg_enable_capture(self.pg_interfaces)
696 capture = self.pg1.get_capture(len(pkts))
697 self.verify_capture_out(capture, nat_ip, True)
699 def test_static_with_port_in(self):
700 """ SNAT 1:1 NAT with port initialized from inside network """
702 self.tcp_port_out = 3606
703 self.udp_port_out = 3607
704 self.icmp_id_out = 3608
706 self.snat_add_address(self.snat_addr)
707 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
708 self.tcp_port_in, self.tcp_port_out,
710 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
711 self.udp_port_in, self.udp_port_out,
713 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
714 self.icmp_id_in, self.icmp_id_out,
715 proto=IP_PROTOS.icmp)
716 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
717 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
721 pkts = self.create_stream_in(self.pg0, self.pg1)
722 self.pg0.add_stream(pkts)
723 self.pg_enable_capture(self.pg_interfaces)
725 capture = self.pg1.get_capture(len(pkts))
726 self.verify_capture_out(capture)
729 pkts = self.create_stream_out(self.pg1)
730 self.pg1.add_stream(pkts)
731 self.pg_enable_capture(self.pg_interfaces)
733 capture = self.pg0.get_capture(len(pkts))
734 self.verify_capture_in(capture, self.pg0)
736 def test_static_with_port_out(self):
737 """ SNAT 1:1 NAT with port initialized from outside network """
739 self.tcp_port_out = 30606
740 self.udp_port_out = 30607
741 self.icmp_id_out = 30608
743 self.snat_add_address(self.snat_addr)
744 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
745 self.tcp_port_in, self.tcp_port_out,
747 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
748 self.udp_port_in, self.udp_port_out,
750 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
751 self.icmp_id_in, self.icmp_id_out,
752 proto=IP_PROTOS.icmp)
753 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
754 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
758 pkts = self.create_stream_out(self.pg1)
759 self.pg1.add_stream(pkts)
760 self.pg_enable_capture(self.pg_interfaces)
762 capture = self.pg0.get_capture(len(pkts))
763 self.verify_capture_in(capture, self.pg0)
766 pkts = self.create_stream_in(self.pg0, self.pg1)
767 self.pg0.add_stream(pkts)
768 self.pg_enable_capture(self.pg_interfaces)
770 capture = self.pg1.get_capture(len(pkts))
771 self.verify_capture_out(capture)
773 def test_static_vrf_aware(self):
774 """ SNAT 1:1 NAT VRF awareness """
776 nat_ip1 = "10.0.0.30"
777 nat_ip2 = "10.0.0.40"
778 self.tcp_port_out = 6303
779 self.udp_port_out = 6304
780 self.icmp_id_out = 6305
782 self.snat_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
784 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
786 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
788 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
789 self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
791 # inside interface VRF match SNAT static mapping VRF
792 pkts = self.create_stream_in(self.pg4, self.pg3)
793 self.pg4.add_stream(pkts)
794 self.pg_enable_capture(self.pg_interfaces)
796 capture = self.pg3.get_capture(len(pkts))
797 self.verify_capture_out(capture, nat_ip1, True)
799 # inside interface VRF don't match SNAT static mapping VRF (packets
801 pkts = self.create_stream_in(self.pg0, self.pg3)
802 self.pg0.add_stream(pkts)
803 self.pg_enable_capture(self.pg_interfaces)
805 self.pg3.assert_nothing_captured()
807 def test_multiple_inside_interfaces(self):
808 """ SNAT multiple inside interfaces (non-overlapping address space) """
810 self.snat_add_address(self.snat_addr)
811 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
812 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
813 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
816 # between two S-NAT inside interfaces (no translation)
817 pkts = self.create_stream_in(self.pg0, self.pg1)
818 self.pg0.add_stream(pkts)
819 self.pg_enable_capture(self.pg_interfaces)
821 capture = self.pg1.get_capture(len(pkts))
822 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
824 # from S-NAT inside to interface without S-NAT feature (no translation)
825 pkts = self.create_stream_in(self.pg0, self.pg2)
826 self.pg0.add_stream(pkts)
827 self.pg_enable_capture(self.pg_interfaces)
829 capture = self.pg2.get_capture(len(pkts))
830 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
832 # in2out 1st interface
833 pkts = self.create_stream_in(self.pg0, self.pg3)
834 self.pg0.add_stream(pkts)
835 self.pg_enable_capture(self.pg_interfaces)
837 capture = self.pg3.get_capture(len(pkts))
838 self.verify_capture_out(capture)
840 # out2in 1st interface
841 pkts = self.create_stream_out(self.pg3)
842 self.pg3.add_stream(pkts)
843 self.pg_enable_capture(self.pg_interfaces)
845 capture = self.pg0.get_capture(len(pkts))
846 self.verify_capture_in(capture, self.pg0)
848 # in2out 2nd interface
849 pkts = self.create_stream_in(self.pg1, self.pg3)
850 self.pg1.add_stream(pkts)
851 self.pg_enable_capture(self.pg_interfaces)
853 capture = self.pg3.get_capture(len(pkts))
854 self.verify_capture_out(capture)
856 # out2in 2nd interface
857 pkts = self.create_stream_out(self.pg3)
858 self.pg3.add_stream(pkts)
859 self.pg_enable_capture(self.pg_interfaces)
861 capture = self.pg1.get_capture(len(pkts))
862 self.verify_capture_in(capture, self.pg1)
864 def test_inside_overlapping_interfaces(self):
865 """ SNAT multiple inside interfaces with overlapping address space """
867 static_nat_ip = "10.0.0.10"
868 self.snat_add_address(self.snat_addr)
869 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
871 self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
872 self.vapi.snat_interface_add_del_feature(self.pg5.sw_if_index)
873 self.vapi.snat_interface_add_del_feature(self.pg6.sw_if_index)
874 self.snat_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
877 # between S-NAT inside interfaces with same VRF (no translation)
878 pkts = self.create_stream_in(self.pg4, self.pg5)
879 self.pg4.add_stream(pkts)
880 self.pg_enable_capture(self.pg_interfaces)
882 capture = self.pg5.get_capture(len(pkts))
883 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
885 # between S-NAT inside interfaces with different VRF (hairpinning)
886 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
887 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
888 TCP(sport=1234, dport=5678))
889 self.pg4.add_stream(p)
890 self.pg_enable_capture(self.pg_interfaces)
892 capture = self.pg6.get_capture(1)
897 self.assertEqual(ip.src, self.snat_addr)
898 self.assertEqual(ip.dst, self.pg6.remote_ip4)
899 self.assertNotEqual(tcp.sport, 1234)
900 self.assertEqual(tcp.dport, 5678)
902 self.logger.error(ppp("Unexpected or invalid packet:", p))
905 # in2out 1st interface
906 pkts = self.create_stream_in(self.pg4, self.pg3)
907 self.pg4.add_stream(pkts)
908 self.pg_enable_capture(self.pg_interfaces)
910 capture = self.pg3.get_capture(len(pkts))
911 self.verify_capture_out(capture)
913 # out2in 1st interface
914 pkts = self.create_stream_out(self.pg3)
915 self.pg3.add_stream(pkts)
916 self.pg_enable_capture(self.pg_interfaces)
918 capture = self.pg4.get_capture(len(pkts))
919 self.verify_capture_in(capture, self.pg4)
921 # in2out 2nd interface
922 pkts = self.create_stream_in(self.pg5, self.pg3)
923 self.pg5.add_stream(pkts)
924 self.pg_enable_capture(self.pg_interfaces)
926 capture = self.pg3.get_capture(len(pkts))
927 self.verify_capture_out(capture)
929 # out2in 2nd interface
930 pkts = self.create_stream_out(self.pg3)
931 self.pg3.add_stream(pkts)
932 self.pg_enable_capture(self.pg_interfaces)
934 capture = self.pg5.get_capture(len(pkts))
935 self.verify_capture_in(capture, self.pg5)
938 addresses = self.vapi.snat_address_dump()
939 self.assertEqual(len(addresses), 1)
940 sessions = self.vapi.snat_user_session_dump(self.pg5.remote_ip4n, 10)
941 self.assertEqual(len(sessions), 3)
942 for session in sessions:
943 self.assertFalse(session.is_static)
944 self.assertEqual(session.inside_ip_address[0:4],
945 self.pg5.remote_ip4n)
946 self.assertEqual(session.outside_ip_address,
947 addresses[0].ip_address)
948 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
949 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
950 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
951 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
952 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
953 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
954 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
955 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
956 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
958 # in2out 3rd interface
959 pkts = self.create_stream_in(self.pg6, self.pg3)
960 self.pg6.add_stream(pkts)
961 self.pg_enable_capture(self.pg_interfaces)
963 capture = self.pg3.get_capture(len(pkts))
964 self.verify_capture_out(capture, static_nat_ip, True)
966 # out2in 3rd interface
967 pkts = self.create_stream_out(self.pg3, static_nat_ip)
968 self.pg3.add_stream(pkts)
969 self.pg_enable_capture(self.pg_interfaces)
971 capture = self.pg6.get_capture(len(pkts))
972 self.verify_capture_in(capture, self.pg6)
974 # general user and session dump verifications
975 users = self.vapi.snat_user_dump()
976 self.assertTrue(len(users) >= 3)
977 addresses = self.vapi.snat_address_dump()
978 self.assertEqual(len(addresses), 1)
980 sessions = self.vapi.snat_user_session_dump(user.ip_address,
982 for session in sessions:
983 self.assertEqual(user.ip_address, session.inside_ip_address)
984 self.assertTrue(session.total_bytes > session.total_pkts > 0)
985 self.assertTrue(session.protocol in
986 [IP_PROTOS.tcp, IP_PROTOS.udp,
990 sessions = self.vapi.snat_user_session_dump(self.pg4.remote_ip4n, 10)
991 self.assertTrue(len(sessions) >= 4)
992 for session in sessions:
993 self.assertFalse(session.is_static)
994 self.assertEqual(session.inside_ip_address[0:4],
995 self.pg4.remote_ip4n)
996 self.assertEqual(session.outside_ip_address,
997 addresses[0].ip_address)
1000 sessions = self.vapi.snat_user_session_dump(self.pg6.remote_ip4n, 20)
1001 self.assertTrue(len(sessions) >= 3)
1002 for session in sessions:
1003 self.assertTrue(session.is_static)
1004 self.assertEqual(session.inside_ip_address[0:4],
1005 self.pg6.remote_ip4n)
1006 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1007 map(int, static_nat_ip.split('.')))
1008 self.assertTrue(session.inside_port in
1009 [self.tcp_port_in, self.udp_port_in,
1012 def test_hairpinning(self):
1013 """ SNAT hairpinning """
1015 host = self.pg0.remote_hosts[0]
1016 server = self.pg0.remote_hosts[1]
1019 server_in_port = 5678
1020 server_out_port = 8765
1022 self.snat_add_address(self.snat_addr)
1023 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1024 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1026 # add static mapping for server
1027 self.snat_add_static_mapping(server.ip4, self.snat_addr,
1028 server_in_port, server_out_port,
1029 proto=IP_PROTOS.tcp)
1031 # send packet from host to server
1032 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1033 IP(src=host.ip4, dst=self.snat_addr) /
1034 TCP(sport=host_in_port, dport=server_out_port))
1035 self.pg0.add_stream(p)
1036 self.pg_enable_capture(self.pg_interfaces)
1038 capture = self.pg0.get_capture(1)
1043 self.assertEqual(ip.src, self.snat_addr)
1044 self.assertEqual(ip.dst, server.ip4)
1045 self.assertNotEqual(tcp.sport, host_in_port)
1046 self.assertEqual(tcp.dport, server_in_port)
1047 host_out_port = tcp.sport
1049 self.logger.error(ppp("Unexpected or invalid packet:", p))
1052 # send reply from server to host
1053 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1054 IP(src=server.ip4, dst=self.snat_addr) /
1055 TCP(sport=server_in_port, dport=host_out_port))
1056 self.pg0.add_stream(p)
1057 self.pg_enable_capture(self.pg_interfaces)
1059 capture = self.pg0.get_capture(1)
1064 self.assertEqual(ip.src, self.snat_addr)
1065 self.assertEqual(ip.dst, host.ip4)
1066 self.assertEqual(tcp.sport, server_out_port)
1067 self.assertEqual(tcp.dport, host_in_port)
1069 self.logger.error(ppp("Unexpected or invalid packet:"), p)
1072 def test_max_translations_per_user(self):
1073 """ MAX translations per user - recycle the least recently used """
1075 self.snat_add_address(self.snat_addr)
1076 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1077 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1080 # get maximum number of translations per user
1081 snat_config = self.vapi.snat_show_config()
1083 # send more than maximum number of translations per user packets
1084 pkts_num = snat_config.max_translations_per_user + 5
1086 for port in range(0, pkts_num):
1087 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1088 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1089 TCP(sport=1025 + port))
1091 self.pg0.add_stream(pkts)
1092 self.pg_enable_capture(self.pg_interfaces)
1095 # verify number of translated packet
1096 self.pg1.get_capture(pkts_num)
1098 def test_interface_addr(self):
1099 """ Acquire SNAT addresses from interface """
1100 self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1102 # no address in NAT pool
1103 adresses = self.vapi.snat_address_dump()
1104 self.assertEqual(0, len(adresses))
1106 # configure interface address and check NAT address pool
1107 self.pg7.config_ip4()
1108 adresses = self.vapi.snat_address_dump()
1109 self.assertEqual(1, len(adresses))
1110 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1112 # remove interface address and check NAT address pool
1113 self.pg7.unconfig_ip4()
1114 adresses = self.vapi.snat_address_dump()
1115 self.assertEqual(0, len(adresses))
1117 def test_interface_addr_static_mapping(self):
1118 """ Static mapping with addresses from interface """
1119 self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1120 self.snat_add_static_mapping('1.2.3.4',
1121 external_sw_if_index=self.pg7.sw_if_index)
1123 # static mappings with external interface
1124 static_mappings = self.vapi.snat_static_mapping_dump()
1125 self.assertEqual(1, len(static_mappings))
1126 self.assertEqual(self.pg7.sw_if_index,
1127 static_mappings[0].external_sw_if_index)
1129 # configure interface address and check static mappings
1130 self.pg7.config_ip4()
1131 static_mappings = self.vapi.snat_static_mapping_dump()
1132 self.assertEqual(1, len(static_mappings))
1133 self.assertEqual(static_mappings[0].external_ip_address[0:4],
1134 self.pg7.local_ip4n)
1135 self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1137 # remove interface address and check static mappings
1138 self.pg7.unconfig_ip4()
1139 static_mappings = self.vapi.snat_static_mapping_dump()
1140 self.assertEqual(0, len(static_mappings))
1142 def test_ipfix_nat44_sess(self):
1143 """ S-NAT IPFIX logging NAT44 session created/delted """
1144 self.snat_add_address(self.snat_addr)
1145 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1146 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1148 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1149 src_address=self.pg3.local_ip4n,
1151 template_interval=10)
1152 self.vapi.snat_ipfix()
1154 pkts = self.create_stream_in(self.pg0, self.pg1)
1155 self.pg0.add_stream(pkts)
1156 self.pg_enable_capture(self.pg_interfaces)
1158 capture = self.pg1.get_capture(len(pkts))
1159 self.verify_capture_out(capture)
1160 self.snat_add_address(self.snat_addr, is_add=0)
1161 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1162 capture = self.pg3.get_capture(3)
1163 ipfix = IPFIXDecoder()
1164 # first load template
1166 self.assertTrue(p.haslayer(IPFIX))
1167 if p.haslayer(Template):
1168 ipfix.add_template(p.getlayer(Template))
1169 # verify events in data set
1171 if p.haslayer(Data):
1172 data = ipfix.decode_data_set(p.getlayer(Set))
1173 self.verify_ipfix_nat44_ses(data)
1175 def test_ipfix_addr_exhausted(self):
1176 """ S-NAT IPFIX logging NAT addresses exhausted """
1177 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1178 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1180 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1181 src_address=self.pg3.local_ip4n,
1183 template_interval=10)
1184 self.vapi.snat_ipfix()
1186 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1187 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1189 self.pg0.add_stream(p)
1190 self.pg_enable_capture(self.pg_interfaces)
1192 capture = self.pg1.get_capture(0)
1193 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1194 capture = self.pg3.get_capture(3)
1195 ipfix = IPFIXDecoder()
1196 # first load template
1198 self.assertTrue(p.haslayer(IPFIX))
1199 if p.haslayer(Template):
1200 ipfix.add_template(p.getlayer(Template))
1201 # verify events in data set
1203 if p.haslayer(Data):
1204 data = ipfix.decode_data_set(p.getlayer(Set))
1205 self.verify_ipfix_addr_exhausted(data)
1207 def test_pool_addr_fib(self):
1208 """ S-NAT add pool addresses to FIB """
1209 static_addr = '10.0.0.10'
1210 self.snat_add_address(self.snat_addr)
1211 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1212 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1214 self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr)
1217 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1218 ARP(op=ARP.who_has, pdst=self.snat_addr,
1219 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1220 self.pg1.add_stream(p)
1221 self.pg_enable_capture(self.pg_interfaces)
1223 capture = self.pg1.get_capture(1)
1224 self.assertTrue(capture[0].haslayer(ARP))
1225 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1228 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1229 ARP(op=ARP.who_has, pdst=static_addr,
1230 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1231 self.pg1.add_stream(p)
1232 self.pg_enable_capture(self.pg_interfaces)
1234 capture = self.pg1.get_capture(1)
1235 self.assertTrue(capture[0].haslayer(ARP))
1236 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1238 # send ARP to non-SNAT interface
1239 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1240 ARP(op=ARP.who_has, pdst=self.snat_addr,
1241 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1242 self.pg2.add_stream(p)
1243 self.pg_enable_capture(self.pg_interfaces)
1245 capture = self.pg1.get_capture(0)
1247 # remove addresses and verify
1248 self.snat_add_address(self.snat_addr, is_add=0)
1249 self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr,
1252 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1253 ARP(op=ARP.who_has, pdst=self.snat_addr,
1254 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1255 self.pg1.add_stream(p)
1256 self.pg_enable_capture(self.pg_interfaces)
1258 capture = self.pg1.get_capture(0)
1260 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1261 ARP(op=ARP.who_has, pdst=static_addr,
1262 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1263 self.pg1.add_stream(p)
1264 self.pg_enable_capture(self.pg_interfaces)
1266 capture = self.pg1.get_capture(0)
1268 def test_vrf_mode(self):
1269 """ S-NAT tenant VRF aware address pool mode """
1273 nat_ip1 = "10.0.0.10"
1274 nat_ip2 = "10.0.0.11"
1276 self.pg0.unconfig_ip4()
1277 self.pg1.unconfig_ip4()
1278 self.pg0.set_table_ip4(vrf_id1)
1279 self.pg1.set_table_ip4(vrf_id2)
1280 self.pg0.config_ip4()
1281 self.pg1.config_ip4()
1283 self.snat_add_address(nat_ip1, vrf_id=vrf_id1)
1284 self.snat_add_address(nat_ip2, vrf_id=vrf_id2)
1285 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1286 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1287 self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1291 pkts = self.create_stream_in(self.pg0, self.pg2)
1292 self.pg0.add_stream(pkts)
1293 self.pg_enable_capture(self.pg_interfaces)
1295 capture = self.pg2.get_capture(len(pkts))
1296 self.verify_capture_out(capture, nat_ip1)
1299 pkts = self.create_stream_in(self.pg1, self.pg2)
1300 self.pg1.add_stream(pkts)
1301 self.pg_enable_capture(self.pg_interfaces)
1303 capture = self.pg2.get_capture(len(pkts))
1304 self.verify_capture_out(capture, nat_ip2)
1306 def test_vrf_feature_independent(self):
1307 """ S-NAT tenant VRF independent address pool mode """
1309 nat_ip1 = "10.0.0.10"
1310 nat_ip2 = "10.0.0.11"
1312 self.snat_add_address(nat_ip1)
1313 self.snat_add_address(nat_ip2)
1314 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1315 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1316 self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1320 pkts = self.create_stream_in(self.pg0, self.pg2)
1321 self.pg0.add_stream(pkts)
1322 self.pg_enable_capture(self.pg_interfaces)
1324 capture = self.pg2.get_capture(len(pkts))
1325 self.verify_capture_out(capture, nat_ip1)
1328 pkts = self.create_stream_in(self.pg1, self.pg2)
1329 self.pg1.add_stream(pkts)
1330 self.pg_enable_capture(self.pg_interfaces)
1332 capture = self.pg2.get_capture(len(pkts))
1333 self.verify_capture_out(capture, nat_ip1)
1335 def test_dynamic_ipless_interfaces(self):
1336 """ SNAT interfaces without configured ip dynamic map """
1338 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1339 self.pg7.remote_mac,
1340 self.pg7.remote_ip4n,
1342 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1343 self.pg8.remote_mac,
1344 self.pg8.remote_ip4n,
1347 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1348 dst_address_length=32,
1349 next_hop_address=self.pg7.remote_ip4n,
1350 next_hop_sw_if_index=self.pg7.sw_if_index)
1351 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1352 dst_address_length=32,
1353 next_hop_address=self.pg8.remote_ip4n,
1354 next_hop_sw_if_index=self.pg8.sw_if_index)
1356 self.snat_add_address(self.snat_addr)
1357 self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1358 self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1362 pkts = self.create_stream_in(self.pg7, self.pg8)
1363 self.pg7.add_stream(pkts)
1364 self.pg_enable_capture(self.pg_interfaces)
1366 capture = self.pg8.get_capture(len(pkts))
1367 self.verify_capture_out(capture)
1370 pkts = self.create_stream_out(self.pg8, self.snat_addr)
1371 self.pg8.add_stream(pkts)
1372 self.pg_enable_capture(self.pg_interfaces)
1374 capture = self.pg7.get_capture(len(pkts))
1375 self.verify_capture_in(capture, self.pg7)
1377 def test_static_ipless_interfaces(self):
1378 """ SNAT 1:1 NAT interfaces without configured ip """
1380 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1381 self.pg7.remote_mac,
1382 self.pg7.remote_ip4n,
1384 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1385 self.pg8.remote_mac,
1386 self.pg8.remote_ip4n,
1389 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1390 dst_address_length=32,
1391 next_hop_address=self.pg7.remote_ip4n,
1392 next_hop_sw_if_index=self.pg7.sw_if_index)
1393 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1394 dst_address_length=32,
1395 next_hop_address=self.pg8.remote_ip4n,
1396 next_hop_sw_if_index=self.pg8.sw_if_index)
1398 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr)
1399 self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1400 self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1404 pkts = self.create_stream_out(self.pg8)
1405 self.pg8.add_stream(pkts)
1406 self.pg_enable_capture(self.pg_interfaces)
1408 capture = self.pg7.get_capture(len(pkts))
1409 self.verify_capture_in(capture, self.pg7)
1412 pkts = self.create_stream_in(self.pg7, self.pg8)
1413 self.pg7.add_stream(pkts)
1414 self.pg_enable_capture(self.pg_interfaces)
1416 capture = self.pg8.get_capture(len(pkts))
1417 self.verify_capture_out(capture, self.snat_addr, True)
1419 def test_static_with_port_ipless_interfaces(self):
1420 """ SNAT 1:1 NAT with port interfaces without configured ip """
1422 self.tcp_port_out = 30606
1423 self.udp_port_out = 30607
1424 self.icmp_id_out = 30608
1426 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1427 self.pg7.remote_mac,
1428 self.pg7.remote_ip4n,
1430 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1431 self.pg8.remote_mac,
1432 self.pg8.remote_ip4n,
1435 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1436 dst_address_length=32,
1437 next_hop_address=self.pg7.remote_ip4n,
1438 next_hop_sw_if_index=self.pg7.sw_if_index)
1439 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1440 dst_address_length=32,
1441 next_hop_address=self.pg8.remote_ip4n,
1442 next_hop_sw_if_index=self.pg8.sw_if_index)
1444 self.snat_add_address(self.snat_addr)
1445 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1446 self.tcp_port_in, self.tcp_port_out,
1447 proto=IP_PROTOS.tcp)
1448 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1449 self.udp_port_in, self.udp_port_out,
1450 proto=IP_PROTOS.udp)
1451 self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1452 self.icmp_id_in, self.icmp_id_out,
1453 proto=IP_PROTOS.icmp)
1454 self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1455 self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1459 pkts = self.create_stream_out(self.pg8)
1460 self.pg8.add_stream(pkts)
1461 self.pg_enable_capture(self.pg_interfaces)
1463 capture = self.pg7.get_capture(len(pkts))
1464 self.verify_capture_in(capture, self.pg7)
1467 pkts = self.create_stream_in(self.pg7, self.pg8)
1468 self.pg7.add_stream(pkts)
1469 self.pg_enable_capture(self.pg_interfaces)
1471 capture = self.pg8.get_capture(len(pkts))
1472 self.verify_capture_out(capture)
1475 super(TestSNAT, self).tearDown()
1476 if not self.vpp_dead:
1477 self.logger.info(self.vapi.cli("show snat verbose"))
1481 class TestDeterministicNAT(MethodHolder):
1482 """ Deterministic NAT Test Cases """
1485 def setUpConstants(cls):
1486 super(TestDeterministicNAT, cls).setUpConstants()
1487 cls.vpp_cmdline.extend(["snat", "{", "deterministic", "}"])
1490 def setUpClass(cls):
1491 super(TestDeterministicNAT, cls).setUpClass()
1494 cls.tcp_port_in = 6303
1495 cls.tcp_external_port = 6303
1496 cls.udp_port_in = 6304
1497 cls.udp_external_port = 6304
1498 cls.icmp_id_in = 6305
1499 cls.snat_addr = '10.0.0.3'
1501 cls.create_pg_interfaces(range(3))
1502 cls.interfaces = list(cls.pg_interfaces)
1504 for i in cls.interfaces:
1509 cls.pg0.generate_remote_hosts(2)
1510 cls.pg0.configure_ipv4_neighbors()
1513 super(TestDeterministicNAT, cls).tearDownClass()
1516 def create_stream_in(self, in_if, out_if, ttl=64):
1518 Create packet stream for inside network
1520 :param in_if: Inside interface
1521 :param out_if: Outside interface
1522 :param ttl: TTL of generated packets
1526 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1527 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1528 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
1532 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1533 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1534 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
1538 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1539 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1540 ICMP(id=self.icmp_id_in, type='echo-request'))
1545 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
1547 Create packet stream for outside network
1549 :param out_if: Outside interface
1550 :param dst_ip: Destination IP address (Default use global SNAT address)
1551 :param ttl: TTL of generated packets
1554 dst_ip = self.snat_addr
1557 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1558 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1559 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
1563 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1564 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1565 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
1569 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1570 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1571 ICMP(id=self.icmp_external_id, type='echo-reply'))
1576 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
1578 Verify captured packets on outside network
1580 :param capture: Captured packets
1581 :param nat_ip: Translated IP address (Default use global SNAT address)
1582 :param same_port: Sorce port number is not translated (Default False)
1583 :param packet_num: Expected number of packets (Default 3)
1586 nat_ip = self.snat_addr
1587 self.assertEqual(packet_num, len(capture))
1588 for packet in capture:
1590 self.assertEqual(packet[IP].src, nat_ip)
1591 if packet.haslayer(TCP):
1592 self.tcp_port_out = packet[TCP].sport
1593 elif packet.haslayer(UDP):
1594 self.udp_port_out = packet[UDP].sport
1596 self.icmp_external_id = packet[ICMP].id
1598 self.logger.error(ppp("Unexpected or invalid packet "
1599 "(outside network):", packet))
1602 def initiate_tcp_session(self, in_if, out_if):
1604 Initiates TCP session
1606 :param in_if: Inside interface
1607 :param out_if: Outside interface
1610 # SYN packet in->out
1611 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
1612 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
1613 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
1616 self.pg_enable_capture(self.pg_interfaces)
1618 capture = out_if.get_capture(1)
1620 self.tcp_port_out = p[TCP].sport
1622 # SYN + ACK packet out->in
1623 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
1624 IP(src=out_if.remote_ip4, dst=self.snat_addr) /
1625 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
1627 out_if.add_stream(p)
1628 self.pg_enable_capture(self.pg_interfaces)
1630 in_if.get_capture(1)
1632 # ACK packet in->out
1633 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
1634 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
1635 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
1638 self.pg_enable_capture(self.pg_interfaces)
1640 out_if.get_capture(1)
1643 self.logger.error("TCP 3 way handshake failed")
1646 def verify_ipfix_max_entries_per_user(self, data):
1648 Verify IPFIX maximum entries per user exceeded event
1650 :param data: Decoded IPFIX data records
1652 self.assertEqual(1, len(data))
1655 self.assertEqual(ord(record[230]), 13)
1656 # natQuotaExceededEvent
1657 self.assertEqual('\x03\x00\x00\x00', record[466])
1659 self.assertEqual(self.pg0.remote_ip4n, record[8])
1661 def test_deterministic_mode(self):
1662 """ S-NAT run deterministic mode """
1663 in_addr = '172.16.255.0'
1664 out_addr = '172.17.255.50'
1665 in_addr_t = '172.16.255.20'
1666 in_addr_n = socket.inet_aton(in_addr)
1667 out_addr_n = socket.inet_aton(out_addr)
1668 in_addr_t_n = socket.inet_aton(in_addr_t)
1672 snat_config = self.vapi.snat_show_config()
1673 self.assertEqual(1, snat_config.deterministic)
1675 self.vapi.snat_add_det_map(in_addr_n, in_plen, out_addr_n, out_plen)
1677 rep1 = self.vapi.snat_det_forward(in_addr_t_n)
1678 self.assertEqual(rep1.out_addr[:4], out_addr_n)
1679 rep2 = self.vapi.snat_det_reverse(out_addr_n, rep1.out_port_hi)
1680 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
1682 deterministic_mappings = self.vapi.snat_det_map_dump()
1683 self.assertEqual(len(deterministic_mappings), 1)
1684 dsm = deterministic_mappings[0]
1685 self.assertEqual(in_addr_n, dsm.in_addr[:4])
1686 self.assertEqual(in_plen, dsm.in_plen)
1687 self.assertEqual(out_addr_n, dsm.out_addr[:4])
1688 self.assertEqual(out_plen, dsm.out_plen)
1691 deterministic_mappings = self.vapi.snat_det_map_dump()
1692 self.assertEqual(len(deterministic_mappings), 0)
1694 def test_set_timeouts(self):
1695 """ Set deterministic NAT timeouts """
1696 timeouts_before = self.vapi.snat_det_get_timeouts()
1698 self.vapi.snat_det_set_timeouts(timeouts_before.udp + 10,
1699 timeouts_before.tcp_established + 10,
1700 timeouts_before.tcp_transitory + 10,
1701 timeouts_before.icmp + 10)
1703 timeouts_after = self.vapi.snat_det_get_timeouts()
1705 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
1706 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
1707 self.assertNotEqual(timeouts_before.tcp_established,
1708 timeouts_after.tcp_established)
1709 self.assertNotEqual(timeouts_before.tcp_transitory,
1710 timeouts_after.tcp_transitory)
1712 def test_det_in(self):
1713 """ CGNAT translation test (TCP, UDP, ICMP) """
1715 nat_ip = "10.0.0.10"
1717 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
1719 socket.inet_aton(nat_ip),
1721 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1722 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1726 pkts = self.create_stream_in(self.pg0, self.pg1)
1727 self.pg0.add_stream(pkts)
1728 self.pg_enable_capture(self.pg_interfaces)
1730 capture = self.pg1.get_capture(len(pkts))
1731 self.verify_capture_out(capture, nat_ip)
1734 pkts = self.create_stream_out(self.pg1, nat_ip)
1735 self.pg1.add_stream(pkts)
1736 self.pg_enable_capture(self.pg_interfaces)
1738 capture = self.pg0.get_capture(len(pkts))
1739 self.verify_capture_in(capture, self.pg0)
1742 sessions = self.vapi.snat_det_session_dump(self.pg0.remote_ip4n)
1743 self.assertEqual(len(sessions), 3)
1747 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
1748 self.assertEqual(s.in_port, self.tcp_port_in)
1749 self.assertEqual(s.out_port, self.tcp_port_out)
1750 self.assertEqual(s.ext_port, self.tcp_external_port)
1754 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
1755 self.assertEqual(s.in_port, self.udp_port_in)
1756 self.assertEqual(s.out_port, self.udp_port_out)
1757 self.assertEqual(s.ext_port, self.udp_external_port)
1761 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
1762 self.assertEqual(s.in_port, self.icmp_id_in)
1763 self.assertEqual(s.out_port, self.icmp_external_id)
1765 def test_multiple_users(self):
1766 """ CGNAT multiple users """
1768 nat_ip = "10.0.0.10"
1770 external_port = 6303
1772 host0 = self.pg0.remote_hosts[0]
1773 host1 = self.pg0.remote_hosts[1]
1775 self.vapi.snat_add_det_map(host0.ip4n,
1777 socket.inet_aton(nat_ip),
1779 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1780 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1784 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
1785 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
1786 TCP(sport=port_in, dport=external_port))
1787 self.pg0.add_stream(p)
1788 self.pg_enable_capture(self.pg_interfaces)
1790 capture = self.pg1.get_capture(1)
1795 self.assertEqual(ip.src, nat_ip)
1796 self.assertEqual(ip.dst, self.pg1.remote_ip4)
1797 self.assertEqual(tcp.dport, external_port)
1798 port_out0 = tcp.sport
1800 self.logger.error(ppp("Unexpected or invalid packet:", p))
1804 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
1805 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
1806 TCP(sport=port_in, dport=external_port))
1807 self.pg0.add_stream(p)
1808 self.pg_enable_capture(self.pg_interfaces)
1810 capture = self.pg1.get_capture(1)
1815 self.assertEqual(ip.src, nat_ip)
1816 self.assertEqual(ip.dst, self.pg1.remote_ip4)
1817 self.assertEqual(tcp.dport, external_port)
1818 port_out1 = tcp.sport
1820 self.logger.error(ppp("Unexpected or invalid packet:", p))
1823 dms = self.vapi.snat_det_map_dump()
1824 self.assertEqual(1, len(dms))
1825 self.assertEqual(2, dms[0].ses_num)
1828 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1829 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
1830 TCP(sport=external_port, dport=port_out0))
1831 self.pg1.add_stream(p)
1832 self.pg_enable_capture(self.pg_interfaces)
1834 capture = self.pg0.get_capture(1)
1839 self.assertEqual(ip.src, self.pg1.remote_ip4)
1840 self.assertEqual(ip.dst, host0.ip4)
1841 self.assertEqual(tcp.dport, port_in)
1842 self.assertEqual(tcp.sport, external_port)
1844 self.logger.error(ppp("Unexpected or invalid packet:", p))
1848 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1849 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
1850 TCP(sport=external_port, dport=port_out1))
1851 self.pg1.add_stream(p)
1852 self.pg_enable_capture(self.pg_interfaces)
1854 capture = self.pg0.get_capture(1)
1859 self.assertEqual(ip.src, self.pg1.remote_ip4)
1860 self.assertEqual(ip.dst, host1.ip4)
1861 self.assertEqual(tcp.dport, port_in)
1862 self.assertEqual(tcp.sport, external_port)
1864 self.logger.error(ppp("Unexpected or invalid packet", p))
1867 # session close api test
1868 self.vapi.snat_det_close_session_out(socket.inet_aton(nat_ip),
1870 self.pg1.remote_ip4n,
1872 dms = self.vapi.snat_det_map_dump()
1873 self.assertEqual(dms[0].ses_num, 1)
1875 self.vapi.snat_det_close_session_in(host0.ip4n,
1877 self.pg1.remote_ip4n,
1879 dms = self.vapi.snat_det_map_dump()
1880 self.assertEqual(dms[0].ses_num, 0)
1882 def test_tcp_session_close_detection_in(self):
1883 """ CGNAT TCP session close initiated from inside network """
1884 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
1886 socket.inet_aton(self.snat_addr),
1888 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1889 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1892 self.initiate_tcp_session(self.pg0, self.pg1)
1894 # close the session from inside
1896 # FIN packet in -> out
1897 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1898 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1899 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
1901 self.pg0.add_stream(p)
1902 self.pg_enable_capture(self.pg_interfaces)
1904 self.pg1.get_capture(1)
1908 # ACK packet out -> in
1909 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1910 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
1911 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
1915 # FIN packet out -> in
1916 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1917 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
1918 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
1922 self.pg1.add_stream(pkts)
1923 self.pg_enable_capture(self.pg_interfaces)
1925 self.pg0.get_capture(2)
1927 # ACK packet in -> out
1928 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1929 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1930 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
1932 self.pg0.add_stream(p)
1933 self.pg_enable_capture(self.pg_interfaces)
1935 self.pg1.get_capture(1)
1937 # Check if snat closed the session
1938 dms = self.vapi.snat_det_map_dump()
1939 self.assertEqual(0, dms[0].ses_num)
1941 self.logger.error("TCP session termination failed")
1944 def test_tcp_session_close_detection_out(self):
1945 """ CGNAT TCP session close initiated from outside network """
1946 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
1948 socket.inet_aton(self.snat_addr),
1950 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1951 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1954 self.initiate_tcp_session(self.pg0, self.pg1)
1956 # close the session from outside
1958 # FIN packet out -> in
1959 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1960 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
1961 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
1963 self.pg1.add_stream(p)
1964 self.pg_enable_capture(self.pg_interfaces)
1966 self.pg0.get_capture(1)
1970 # ACK packet in -> out
1971 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1972 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1973 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
1977 # ACK packet in -> out
1978 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1979 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1980 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
1984 self.pg0.add_stream(pkts)
1985 self.pg_enable_capture(self.pg_interfaces)
1987 self.pg1.get_capture(2)
1989 # ACK packet out -> in
1990 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1991 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
1992 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
1994 self.pg1.add_stream(p)
1995 self.pg_enable_capture(self.pg_interfaces)
1997 self.pg0.get_capture(1)
1999 # Check if snat closed the session
2000 dms = self.vapi.snat_det_map_dump()
2001 self.assertEqual(0, dms[0].ses_num)
2003 self.logger.error("TCP session termination failed")
2006 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2007 def test_session_timeout(self):
2008 """ CGNAT session timeouts """
2009 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2011 socket.inet_aton(self.snat_addr),
2013 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2014 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2017 self.initiate_tcp_session(self.pg0, self.pg1)
2018 self.vapi.snat_det_set_timeouts(5, 5, 5, 5)
2019 pkts = self.create_stream_in(self.pg0, self.pg1)
2020 self.pg0.add_stream(pkts)
2021 self.pg_enable_capture(self.pg_interfaces)
2023 capture = self.pg1.get_capture(len(pkts))
2026 dms = self.vapi.snat_det_map_dump()
2027 self.assertEqual(0, dms[0].ses_num)
2029 def test_session_limit_per_user(self):
2030 """ CGNAT maximum 1000 sessions per user should be created """
2031 self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2033 socket.inet_aton(self.snat_addr),
2035 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2036 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2038 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
2039 src_address=self.pg2.local_ip4n,
2041 template_interval=10)
2042 self.vapi.snat_ipfix()
2045 for port in range(1025, 2025):
2046 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2047 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2048 UDP(sport=port, dport=port))
2051 self.pg0.add_stream(pkts)
2052 self.pg_enable_capture(self.pg_interfaces)
2054 capture = self.pg1.get_capture(len(pkts))
2056 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2057 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2058 UDP(sport=3001, dport=3002))
2059 self.pg0.add_stream(p)
2060 self.pg_enable_capture(self.pg_interfaces)
2062 capture = self.pg1.assert_nothing_captured()
2064 # verify ICMP error packet
2065 capture = self.pg0.get_capture(1)
2067 self.assertTrue(p.haslayer(ICMP))
2069 self.assertEqual(icmp.type, 3)
2070 self.assertEqual(icmp.code, 1)
2071 self.assertTrue(icmp.haslayer(IPerror))
2072 inner_ip = icmp[IPerror]
2073 self.assertEqual(inner_ip[UDPerror].sport, 3001)
2074 self.assertEqual(inner_ip[UDPerror].dport, 3002)
2076 dms = self.vapi.snat_det_map_dump()
2078 self.assertEqual(1000, dms[0].ses_num)
2080 # verify IPFIX logging
2081 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2082 capture = self.pg2.get_capture(2)
2083 ipfix = IPFIXDecoder()
2084 # first load template
2086 self.assertTrue(p.haslayer(IPFIX))
2087 if p.haslayer(Template):
2088 ipfix.add_template(p.getlayer(Template))
2089 # verify events in data set
2091 if p.haslayer(Data):
2092 data = ipfix.decode_data_set(p.getlayer(Set))
2093 self.verify_ipfix_max_entries_per_user(data)
2095 def clear_snat(self):
2097 Clear SNAT configuration.
2099 self.vapi.snat_ipfix(enable=0)
2100 self.vapi.snat_det_set_timeouts()
2101 deterministic_mappings = self.vapi.snat_det_map_dump()
2102 for dsm in deterministic_mappings:
2103 self.vapi.snat_add_det_map(dsm.in_addr,
2109 interfaces = self.vapi.snat_interface_dump()
2110 for intf in interfaces:
2111 self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
2116 super(TestDeterministicNAT, self).tearDown()
2117 if not self.vpp_dead:
2118 self.logger.info(self.vapi.cli("show snat detail"))
2121 if __name__ == '__main__':
2122 unittest.main(testRunner=VppTestRunner)