VPP-279: Document changes for vnet/vnet/devices
[vpp.git] / test / test_snat.py
1 #!/usr/bin/env python
2
3 import socket
4 import unittest
5
6 from framework import VppTestCase, VppTestRunner
7
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.l2 import Ether
10 from util import ppp
11
12
13 class TestSNAT(VppTestCase):
14     """ SNAT Test Cases """
15
16     @classmethod
17     def setUpClass(cls):
18         super(TestSNAT, cls).setUpClass()
19
20         try:
21             cls.tcp_port_in = 6303
22             cls.tcp_port_out = 6303
23             cls.udp_port_in = 6304
24             cls.udp_port_out = 6304
25             cls.icmp_id_in = 6305
26             cls.icmp_id_out = 6305
27             cls.snat_addr = '10.0.0.3'
28
29             cls.create_pg_interfaces(range(8))
30             cls.interfaces = list(cls.pg_interfaces[0:4])
31
32             for i in cls.interfaces:
33                 i.admin_up()
34                 i.config_ip4()
35                 i.resolve_arp()
36
37             cls.pg0.generate_remote_hosts(2)
38             cls.pg0.configure_ipv4_neighbors()
39
40             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
41
42             for i in cls.overlapping_interfaces:
43                 i._local_ip4 = "172.16.255.1"
44                 i._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
45                 i._remote_hosts[0]._ip4 = "172.16.255.2"
46                 i.set_table_ip4(i.sw_if_index)
47                 i.config_ip4()
48                 i.admin_up()
49                 i.resolve_arp()
50
51             cls.pg7.admin_up()
52
53         except Exception:
54             super(TestSNAT, cls).tearDownClass()
55             raise
56
57     def create_stream_in(self, in_if, out_if):
58         """
59         Create packet stream for inside network
60
61         :param in_if: Inside interface
62         :param out_if: Outside interface
63         """
64         pkts = []
65         # TCP
66         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
67              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
68              TCP(sport=self.tcp_port_in))
69         pkts.append(p)
70
71         # UDP
72         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
73              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
74              UDP(sport=self.udp_port_in))
75         pkts.append(p)
76
77         # ICMP
78         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
79              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
80              ICMP(id=self.icmp_id_in, type='echo-request'))
81         pkts.append(p)
82
83         return pkts
84
85     def create_stream_out(self, out_if, dst_ip=None):
86         """
87         Create packet stream for outside network
88
89         :param out_if: Outside interface
90         :param dst_ip: Destination IP address (Default use global SNAT address)
91         """
92         if dst_ip is None:
93             dst_ip = self.snat_addr
94         pkts = []
95         # TCP
96         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
97              IP(src=out_if.remote_ip4, dst=dst_ip) /
98              TCP(dport=self.tcp_port_out))
99         pkts.append(p)
100
101         # UDP
102         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
103              IP(src=out_if.remote_ip4, dst=dst_ip) /
104              UDP(dport=self.udp_port_out))
105         pkts.append(p)
106
107         # ICMP
108         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
109              IP(src=out_if.remote_ip4, dst=dst_ip) /
110              ICMP(id=self.icmp_id_out, type='echo-reply'))
111         pkts.append(p)
112
113         return pkts
114
115     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
116                            packet_num=3):
117         """
118         Verify captured packets on outside network
119
120         :param capture: Captured packets
121         :param nat_ip: Translated IP address (Default use global SNAT address)
122         :param same_port: Sorce port number is not translated (Default False)
123         :param packet_num: Expected number of packets (Default 3)
124         """
125         if nat_ip is None:
126             nat_ip = self.snat_addr
127         self.assertEqual(packet_num, len(capture))
128         for packet in capture:
129             try:
130                 self.assertEqual(packet[IP].src, nat_ip)
131                 if packet.haslayer(TCP):
132                     if same_port:
133                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
134                     else:
135                         self.assertNotEqual(
136                             packet[TCP].sport, self.tcp_port_in)
137                     self.tcp_port_out = packet[TCP].sport
138                 elif packet.haslayer(UDP):
139                     if same_port:
140                         self.assertEqual(packet[UDP].sport, self.udp_port_in)
141                     else:
142                         self.assertNotEqual(
143                             packet[UDP].sport, self.udp_port_in)
144                     self.udp_port_out = packet[UDP].sport
145                 else:
146                     if same_port:
147                         self.assertEqual(packet[ICMP].id, self.icmp_id_in)
148                     else:
149                         self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
150                     self.icmp_id_out = packet[ICMP].id
151             except:
152                 self.logger.error(ppp("Unexpected or invalid packet "
153                                       "(outside network):", packet))
154                 raise
155
156     def verify_capture_in(self, capture, in_if, packet_num=3):
157         """
158         Verify captured packets on inside network
159
160         :param capture: Captured packets
161         :param in_if: Inside interface
162         :param packet_num: Expected number of packets (Default 3)
163         """
164         self.assertEqual(packet_num, len(capture))
165         for packet in capture:
166             try:
167                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
168                 if packet.haslayer(TCP):
169                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
170                 elif packet.haslayer(UDP):
171                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
172                 else:
173                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
174             except:
175                 self.logger.error(ppp("Unexpected or invalid packet "
176                                       "(inside network):", packet))
177                 raise
178
179     def clear_snat(self):
180         """
181         Clear SNAT configuration.
182         """
183         interfaces = self.vapi.snat_interface_addr_dump()
184         for intf in interfaces:
185             self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
186
187         interfaces = self.vapi.snat_interface_dump()
188         for intf in interfaces:
189             self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
190                                                      intf.is_inside,
191                                                      is_add=0)
192
193         static_mappings = self.vapi.snat_static_mapping_dump()
194         for sm in static_mappings:
195             self.vapi.snat_add_static_mapping(sm.local_ip_address,
196                                               sm.external_ip_address,
197                                               local_port=sm.local_port,
198                                               external_port=sm.external_port,
199                                               addr_only=sm.addr_only,
200                                               vrf_id=sm.vrf_id,
201                                               is_add=0)
202
203         adresses = self.vapi.snat_address_dump()
204         for addr in adresses:
205             self.vapi.snat_add_address_range(addr.ip_address,
206                                              addr.ip_address,
207                                              is_add=0)
208
209     def snat_add_static_mapping(self, local_ip, external_ip, local_port=0,
210                                 external_port=0, vrf_id=0, is_add=1):
211         """
212         Add/delete S-NAT static mapping
213
214         :param local_ip: Local IP address
215         :param external_ip: External IP address
216         :param local_port: Local port number (Optional)
217         :param external_port: External port number (Optional)
218         :param vrf_id: VRF ID (Default 0)
219         :param is_add: 1 if add, 0 if delete (Default add)
220         """
221         addr_only = 1
222         if local_port and external_port:
223             addr_only = 0
224         l_ip = socket.inet_pton(socket.AF_INET, local_ip)
225         e_ip = socket.inet_pton(socket.AF_INET, external_ip)
226         self.vapi.snat_add_static_mapping(
227             l_ip,
228             e_ip,
229             local_port,
230             external_port,
231             addr_only,
232             vrf_id,
233             is_add)
234
235     def snat_add_address(self, ip, is_add=1):
236         """
237         Add/delete S-NAT address
238
239         :param ip: IP address
240         :param is_add: 1 if add, 0 if delete (Default add)
241         """
242         snat_addr = socket.inet_pton(socket.AF_INET, ip)
243         self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add)
244
245     def test_dynamic(self):
246         """ SNAT dynamic translation test """
247
248         self.snat_add_address(self.snat_addr)
249         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
250         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
251                                                  is_inside=0)
252
253         # in2out
254         pkts = self.create_stream_in(self.pg0, self.pg1)
255         self.pg0.add_stream(pkts)
256         self.pg_enable_capture(self.pg_interfaces)
257         self.pg_start()
258         capture = self.pg1.get_capture(len(pkts))
259         self.verify_capture_out(capture)
260
261         # out2in
262         pkts = self.create_stream_out(self.pg1)
263         self.pg1.add_stream(pkts)
264         self.pg_enable_capture(self.pg_interfaces)
265         self.pg_start()
266         capture = self.pg0.get_capture(len(pkts))
267         self.verify_capture_in(capture, self.pg0)
268
269     def test_static_in(self):
270         """ SNAT 1:1 NAT initialized from inside network """
271
272         nat_ip = "10.0.0.10"
273         self.tcp_port_out = 6303
274         self.udp_port_out = 6304
275         self.icmp_id_out = 6305
276
277         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
278         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
279         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
280                                                  is_inside=0)
281
282         # in2out
283         pkts = self.create_stream_in(self.pg0, self.pg1)
284         self.pg0.add_stream(pkts)
285         self.pg_enable_capture(self.pg_interfaces)
286         self.pg_start()
287         capture = self.pg1.get_capture(len(pkts))
288         self.verify_capture_out(capture, nat_ip, True)
289
290         # out2in
291         pkts = self.create_stream_out(self.pg1, nat_ip)
292         self.pg1.add_stream(pkts)
293         self.pg_enable_capture(self.pg_interfaces)
294         self.pg_start()
295         capture = self.pg0.get_capture(len(pkts))
296         self.verify_capture_in(capture, self.pg0)
297
298     def test_static_out(self):
299         """ SNAT 1:1 NAT initialized from outside network """
300
301         nat_ip = "10.0.0.20"
302         self.tcp_port_out = 6303
303         self.udp_port_out = 6304
304         self.icmp_id_out = 6305
305
306         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
307         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
308         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
309                                                  is_inside=0)
310
311         # out2in
312         pkts = self.create_stream_out(self.pg1, nat_ip)
313         self.pg1.add_stream(pkts)
314         self.pg_enable_capture(self.pg_interfaces)
315         self.pg_start()
316         capture = self.pg0.get_capture(len(pkts))
317         self.verify_capture_in(capture, self.pg0)
318
319         # in2out
320         pkts = self.create_stream_in(self.pg0, self.pg1)
321         self.pg0.add_stream(pkts)
322         self.pg_enable_capture(self.pg_interfaces)
323         self.pg_start()
324         capture = self.pg1.get_capture(len(pkts))
325         self.verify_capture_out(capture, nat_ip, True)
326
327     def test_static_with_port_in(self):
328         """ SNAT 1:1 NAT with port initialized from inside network """
329
330         self.tcp_port_out = 3606
331         self.udp_port_out = 3607
332         self.icmp_id_out = 3608
333
334         self.snat_add_address(self.snat_addr)
335         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
336                                      self.tcp_port_in, self.tcp_port_out)
337         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
338                                      self.udp_port_in, self.udp_port_out)
339         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
340                                      self.icmp_id_in, self.icmp_id_out)
341         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
342         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
343                                                  is_inside=0)
344
345         # in2out
346         pkts = self.create_stream_in(self.pg0, self.pg1)
347         self.pg0.add_stream(pkts)
348         self.pg_enable_capture(self.pg_interfaces)
349         self.pg_start()
350         capture = self.pg1.get_capture(len(pkts))
351         self.verify_capture_out(capture)
352
353         # out2in
354         pkts = self.create_stream_out(self.pg1)
355         self.pg1.add_stream(pkts)
356         self.pg_enable_capture(self.pg_interfaces)
357         self.pg_start()
358         capture = self.pg0.get_capture(len(pkts))
359         self.verify_capture_in(capture, self.pg0)
360
361     def test_static_with_port_out(self):
362         """ SNAT 1:1 NAT with port initialized from outside network """
363
364         self.tcp_port_out = 30606
365         self.udp_port_out = 30607
366         self.icmp_id_out = 30608
367
368         self.snat_add_address(self.snat_addr)
369         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
370                                      self.tcp_port_in, self.tcp_port_out)
371         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
372                                      self.udp_port_in, self.udp_port_out)
373         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
374                                      self.icmp_id_in, self.icmp_id_out)
375         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
376         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
377                                                  is_inside=0)
378
379         # out2in
380         pkts = self.create_stream_out(self.pg1)
381         self.pg1.add_stream(pkts)
382         self.pg_enable_capture(self.pg_interfaces)
383         self.pg_start()
384         capture = self.pg0.get_capture(len(pkts))
385         self.verify_capture_in(capture, self.pg0)
386
387         # in2out
388         pkts = self.create_stream_in(self.pg0, self.pg1)
389         self.pg0.add_stream(pkts)
390         self.pg_enable_capture(self.pg_interfaces)
391         self.pg_start()
392         capture = self.pg1.get_capture(len(pkts))
393         self.verify_capture_out(capture)
394
395     def test_static_vrf_aware(self):
396         """ SNAT 1:1 NAT VRF awareness """
397
398         nat_ip1 = "10.0.0.30"
399         nat_ip2 = "10.0.0.40"
400         self.tcp_port_out = 6303
401         self.udp_port_out = 6304
402         self.icmp_id_out = 6305
403
404         self.snat_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
405                                      vrf_id=self.pg4.sw_if_index)
406         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
407                                      vrf_id=self.pg4.sw_if_index)
408         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
409                                                  is_inside=0)
410         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
411         self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
412
413         # inside interface VRF match SNAT static mapping VRF
414         pkts = self.create_stream_in(self.pg4, self.pg3)
415         self.pg4.add_stream(pkts)
416         self.pg_enable_capture(self.pg_interfaces)
417         self.pg_start()
418         capture = self.pg3.get_capture(len(pkts))
419         self.verify_capture_out(capture, nat_ip1, True)
420
421         # inside interface VRF don't match SNAT static mapping VRF (packets
422         # are dropped)
423         pkts = self.create_stream_in(self.pg0, self.pg3)
424         self.pg0.add_stream(pkts)
425         self.pg_enable_capture(self.pg_interfaces)
426         self.pg_start()
427         self.pg3.assert_nothing_captured()
428
429     def test_multiple_inside_interfaces(self):
430         """
431         SNAT multiple inside interfaces with non-overlapping address space
432         """
433
434         self.snat_add_address(self.snat_addr)
435         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
436         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
437         self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index)
438         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
439                                                  is_inside=0)
440
441         # in2out 1st interface
442         pkts = self.create_stream_in(self.pg0, self.pg3)
443         self.pg0.add_stream(pkts)
444         self.pg_enable_capture(self.pg_interfaces)
445         self.pg_start()
446         capture = self.pg3.get_capture(len(pkts))
447         self.verify_capture_out(capture)
448
449         # out2in 1st interface
450         pkts = self.create_stream_out(self.pg3)
451         self.pg3.add_stream(pkts)
452         self.pg_enable_capture(self.pg_interfaces)
453         self.pg_start()
454         capture = self.pg0.get_capture(len(pkts))
455         self.verify_capture_in(capture, self.pg0)
456
457         # in2out 2nd interface
458         pkts = self.create_stream_in(self.pg1, self.pg3)
459         self.pg1.add_stream(pkts)
460         self.pg_enable_capture(self.pg_interfaces)
461         self.pg_start()
462         capture = self.pg3.get_capture(len(pkts))
463         self.verify_capture_out(capture)
464
465         # out2in 2nd interface
466         pkts = self.create_stream_out(self.pg3)
467         self.pg3.add_stream(pkts)
468         self.pg_enable_capture(self.pg_interfaces)
469         self.pg_start()
470         capture = self.pg1.get_capture(len(pkts))
471         self.verify_capture_in(capture, self.pg1)
472
473         # in2out 3rd interface
474         pkts = self.create_stream_in(self.pg2, self.pg3)
475         self.pg2.add_stream(pkts)
476         self.pg_enable_capture(self.pg_interfaces)
477         self.pg_start()
478         capture = self.pg3.get_capture(len(pkts))
479         self.verify_capture_out(capture)
480
481         # out2in 3rd interface
482         pkts = self.create_stream_out(self.pg3)
483         self.pg3.add_stream(pkts)
484         self.pg_enable_capture(self.pg_interfaces)
485         self.pg_start()
486         capture = self.pg2.get_capture(len(pkts))
487         self.verify_capture_in(capture, self.pg2)
488
489     def test_inside_overlapping_interfaces(self):
490         """ SNAT multiple inside interfaces with overlapping address space """
491
492         self.snat_add_address(self.snat_addr)
493         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
494                                                  is_inside=0)
495         self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
496         self.vapi.snat_interface_add_del_feature(self.pg5.sw_if_index)
497         self.vapi.snat_interface_add_del_feature(self.pg6.sw_if_index)
498
499         # in2out 1st interface
500         pkts = self.create_stream_in(self.pg4, self.pg3)
501         self.pg4.add_stream(pkts)
502         self.pg_enable_capture(self.pg_interfaces)
503         self.pg_start()
504         capture = self.pg3.get_capture(len(pkts))
505         self.verify_capture_out(capture)
506
507         # out2in 1st interface
508         pkts = self.create_stream_out(self.pg3)
509         self.pg3.add_stream(pkts)
510         self.pg_enable_capture(self.pg_interfaces)
511         self.pg_start()
512         capture = self.pg4.get_capture(len(pkts))
513         self.verify_capture_in(capture, self.pg4)
514
515         # in2out 2nd interface
516         pkts = self.create_stream_in(self.pg5, self.pg3)
517         self.pg5.add_stream(pkts)
518         self.pg_enable_capture(self.pg_interfaces)
519         self.pg_start()
520         capture = self.pg3.get_capture(len(pkts))
521         self.verify_capture_out(capture)
522
523         # out2in 2nd interface
524         pkts = self.create_stream_out(self.pg3)
525         self.pg3.add_stream(pkts)
526         self.pg_enable_capture(self.pg_interfaces)
527         self.pg_start()
528         capture = self.pg5.get_capture(len(pkts))
529         self.verify_capture_in(capture, self.pg5)
530
531         # in2out 3rd interface
532         pkts = self.create_stream_in(self.pg6, self.pg3)
533         self.pg6.add_stream(pkts)
534         self.pg_enable_capture(self.pg_interfaces)
535         self.pg_start()
536         capture = self.pg3.get_capture(len(pkts))
537         self.verify_capture_out(capture)
538
539         # out2in 3rd interface
540         pkts = self.create_stream_out(self.pg3)
541         self.pg3.add_stream(pkts)
542         self.pg_enable_capture(self.pg_interfaces)
543         self.pg_start()
544         capture = self.pg6.get_capture(len(pkts))
545         self.verify_capture_in(capture, self.pg6)
546
547     def test_hairpinning(self):
548         """ SNAT hairpinning """
549
550         host = self.pg0.remote_hosts[0]
551         server = self.pg0.remote_hosts[1]
552         host_in_port = 1234
553         host_out_port = 0
554         server_in_port = 5678
555         server_out_port = 8765
556
557         self.snat_add_address(self.snat_addr)
558         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
559         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
560                                                  is_inside=0)
561         # add static mapping for server
562         self.snat_add_static_mapping(server.ip4, self.snat_addr,
563                                      server_in_port, server_out_port)
564
565         # send packet from host to server
566         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
567              IP(src=host.ip4, dst=self.snat_addr) /
568              TCP(sport=host_in_port, dport=server_out_port))
569         self.pg0.add_stream(p)
570         self.pg_enable_capture(self.pg_interfaces)
571         self.pg_start()
572         capture = self.pg0.get_capture(1)
573         p = capture[0]
574         try:
575             ip = p[IP]
576             tcp = p[TCP]
577             self.assertEqual(ip.src, self.snat_addr)
578             self.assertEqual(ip.dst, server.ip4)
579             self.assertNotEqual(tcp.sport, host_in_port)
580             self.assertEqual(tcp.dport, server_in_port)
581             host_out_port = tcp.sport
582         except:
583             self.logger.error(ppp("Unexpected or invalid packet:", p))
584             raise
585
586         # send reply from server to host
587         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
588              IP(src=server.ip4, dst=self.snat_addr) /
589              TCP(sport=server_in_port, dport=host_out_port))
590         self.pg0.add_stream(p)
591         self.pg_enable_capture(self.pg_interfaces)
592         self.pg_start()
593         capture = self.pg0.get_capture(1)
594         p = capture[0]
595         try:
596             ip = p[IP]
597             tcp = p[TCP]
598             self.assertEqual(ip.src, self.snat_addr)
599             self.assertEqual(ip.dst, host.ip4)
600             self.assertEqual(tcp.sport, server_out_port)
601             self.assertEqual(tcp.dport, host_in_port)
602         except:
603             self.logger.error(ppp("Unexpected or invalid packet:"), p)
604             raise
605
606     def test_max_translations_per_user(self):
607         """ MAX translations per user - recycle the least recently used """
608
609         self.snat_add_address(self.snat_addr)
610         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
611         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
612                                                  is_inside=0)
613
614         # get maximum number of translations per user
615         snat_config = self.vapi.snat_show_config()
616
617         # send more than maximum number of translations per user packets
618         pkts_num = snat_config.max_translations_per_user + 5
619         pkts = []
620         for port in range(0, pkts_num):
621             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
622                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
623                  TCP(sport=1025 + port))
624             pkts.append(p)
625         self.pg0.add_stream(pkts)
626         self.pg_enable_capture(self.pg_interfaces)
627         self.pg_start()
628
629         # verify number of translated packet
630         self.pg1.get_capture(pkts_num)
631
632     def test_interface_addr(self):
633         """ Acquire SNAT addresses from interface """
634         self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
635
636         # no address in NAT pool
637         adresses = self.vapi.snat_address_dump()
638         self.assertEqual(0, len(adresses))
639
640         # configure interface address and check NAT address pool
641         self.pg7.config_ip4()
642         adresses = self.vapi.snat_address_dump()
643         self.assertEqual(1, len(adresses))
644
645         # remove interface address and check NAT address pool
646         self.pg7.unconfig_ip4()
647         adresses = self.vapi.snat_address_dump()
648         self.assertEqual(0, len(adresses))
649
650     def tearDown(self):
651         super(TestSNAT, self).tearDown()
652         if not self.vpp_dead:
653             self.logger.info(self.vapi.cli("show snat verbose"))
654             self.clear_snat()
655
656 if __name__ == '__main__':
657     unittest.main(testRunner=VppTestRunner)