tests: Use errno value rather than a specific int
[vpp.git] / test / test_ipfix_export.py
1 #!/usr/bin/env python3
2 from framework import VppTestCase
3 from ipaddress import IPv4Address
4
5
6 class TestIpfixExporter(VppTestCase):
7     """Ipfix Exporter Tests"""
8
9     def setUp(self):
10         super(TestIpfixExporter, self).setUp()
11         self.create_pg_interfaces(range(4))
12         for i in self.pg_interfaces:
13             i.admin_up()
14             i.config_ip4()
15             i.resolve_arp()
16             i.config_ip6()
17             i.resolve_ndp()
18             i.disable_ipv6_ra()
19
20     def tearDown(self):
21         super(TestIpfixExporter, self).tearDown()
22         for i in self.pg_interfaces:
23             i.unconfig_ip4()
24             i.unconfig_ip6()
25             i.admin_down()
26
27     def find_exp_by_collector_addr(self, exporters, addr):
28         """Find the exporter in the list of exportes with the given  addr"""
29
30         for exp in exporters:
31             if exp.collector_address == IPv4Address(addr):
32                 return exp
33         return None
34
35     def verify_exporter_detail(
36         self, exp, collector_addr, src_addr, collector_port=4739, mtu=1400, interval=20
37     ):
38         self.assertTrue(exp is not None)
39         self.assert_equal(exp.collector_address, collector_addr)
40         self.assert_equal(exp.src_address, src_addr)
41         self.assert_equal(exp.collector_port, collector_port)
42         self.assert_equal(exp.path_mtu, mtu)
43         self.assert_equal(exp.template_interval, interval)
44
45     def test_create_multipe_exporters(self):
46         """test that we can create and dump multiple exporters"""
47
48         mtu = 1400
49         interval = 20
50         port = 4739
51
52         # Old API - always gives us pool index 0.
53         self.vapi.set_ipfix_exporter(
54             collector_address=self.pg1.remote_ip4,
55             src_address=self.pg0.local_ip4,
56             collector_port=4739,
57             path_mtu=mtu,
58             template_interval=interval,
59         )
60
61         exporters = self.vapi.ipfix_exporter_dump()
62         exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4)
63         self.verify_exporter_detail(
64             exp, IPv4Address(self.pg1.remote_ip4), IPv4Address(self.pg0.local_ip4)
65         )
66
67         exporters = list(self.vapi.vpp.details_iter(self.vapi.ipfix_all_exporter_get))
68         exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4)
69         self.verify_exporter_detail(
70             exp, IPv4Address(self.pg1.remote_ip4), IPv4Address(self.pg0.local_ip4)
71         )
72
73         # create a 2nd exporter
74         self.vapi.ipfix_exporter_create_delete(
75             collector_address=self.pg2.remote_ip4,
76             src_address=self.pg0.local_ip4,
77             collector_port=4739,
78             path_mtu=mtu,
79             template_interval=interval,
80             is_create=True,
81         )
82
83         exporters = list(self.vapi.vpp.details_iter(self.vapi.ipfix_all_exporter_get))
84         self.assertTrue(len(exporters) == 2)
85         exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4)
86         self.verify_exporter_detail(
87             exp, IPv4Address(self.pg1.remote_ip4), IPv4Address(self.pg0.local_ip4)
88         )
89         exp = self.find_exp_by_collector_addr(exporters, self.pg2.remote_ip4)
90         self.verify_exporter_detail(
91             exp, IPv4Address(self.pg2.remote_ip4), IPv4Address(self.pg0.local_ip4)
92         )
93
94         # Create a 3rd exporter
95         self.vapi.ipfix_exporter_create_delete(
96             collector_address=self.pg3.remote_ip4,
97             src_address=self.pg0.local_ip4,
98             collector_port=4739,
99             path_mtu=mtu,
100             template_interval=interval,
101             is_create=True,
102         )
103
104         exporters = list(self.vapi.vpp.details_iter(self.vapi.ipfix_all_exporter_get))
105         self.assertTrue(len(exporters) == 3)
106         exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4)
107         self.verify_exporter_detail(
108             exp, IPv4Address(self.pg1.remote_ip4), IPv4Address(self.pg0.local_ip4)
109         )
110         exp = self.find_exp_by_collector_addr(exporters, self.pg2.remote_ip4)
111         self.verify_exporter_detail(
112             exp, IPv4Address(self.pg2.remote_ip4), IPv4Address(self.pg0.local_ip4)
113         )
114         exp = self.find_exp_by_collector_addr(exporters, self.pg3.remote_ip4)
115         self.verify_exporter_detail(
116             exp, IPv4Address(self.pg3.remote_ip4), IPv4Address(self.pg0.local_ip4)
117         )
118
119         # Modify the 2nd exporter.
120         self.vapi.ipfix_exporter_create_delete(
121             collector_address=self.pg2.remote_ip4,
122             src_address=self.pg0.local_ip4,
123             collector_port=4739,
124             path_mtu=mtu + 1,
125             template_interval=interval + 1,
126             is_create=True,
127         )
128
129         exporters = list(self.vapi.vpp.details_iter(self.vapi.ipfix_all_exporter_get))
130         self.assertTrue(len(exporters) == 3)
131         exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4)
132         self.verify_exporter_detail(
133             exp, IPv4Address(self.pg1.remote_ip4), IPv4Address(self.pg0.local_ip4)
134         )
135         exp = self.find_exp_by_collector_addr(exporters, self.pg2.remote_ip4)
136         self.verify_exporter_detail(
137             exp,
138             IPv4Address(self.pg2.remote_ip4),
139             IPv4Address(self.pg0.local_ip4),
140             mtu=mtu + 1,
141             interval=interval + 1,
142         )
143         exp = self.find_exp_by_collector_addr(exporters, self.pg3.remote_ip4)
144         self.verify_exporter_detail(
145             exp, IPv4Address(self.pg3.remote_ip4), IPv4Address(self.pg0.local_ip4)
146         )
147
148         # Delete 2nd exporter
149         self.vapi.ipfix_exporter_create_delete(
150             collector_address=self.pg2.remote_ip4,
151             src_address=self.pg0.local_ip4,
152             collector_port=4739,
153             path_mtu=mtu,
154             template_interval=interval,
155             is_create=False,
156         )
157
158         exporters = list(self.vapi.vpp.details_iter(self.vapi.ipfix_all_exporter_get))
159         self.assertTrue(len(exporters) == 2)
160         exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4)
161         self.verify_exporter_detail(
162             exp, IPv4Address(self.pg1.remote_ip4), IPv4Address(self.pg0.local_ip4)
163         )
164         exp = self.find_exp_by_collector_addr(exporters, self.pg3.remote_ip4)
165         self.verify_exporter_detail(
166             exp, IPv4Address(self.pg3.remote_ip4), IPv4Address(self.pg0.local_ip4)
167         )
168
169         # Delete final exporter (exporter in slot 0 can not be deleted)
170         self.vapi.ipfix_exporter_create_delete(
171             collector_address=self.pg3.remote_ip4,
172             src_address=self.pg0.local_ip4,
173             collector_port=4739,
174             path_mtu=mtu,
175             template_interval=interval,
176             is_create=False,
177         )
178
179         exporters = list(self.vapi.vpp.details_iter(self.vapi.ipfix_all_exporter_get))
180         self.assertTrue(len(exporters) == 1)
181         exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4)
182         self.verify_exporter_detail(
183             exp, IPv4Address(self.pg1.remote_ip4), IPv4Address(self.pg0.local_ip4)
184         )