2 from framework import VppTestCase
3 from ipaddress import IPv4Address
6 class TestIpfixExporter(VppTestCase):
7 """Ipfix Exporter Tests"""
10 super(TestIpfixExporter, self).setUp()
11 self.create_pg_interfaces(range(4))
12 for i in self.pg_interfaces:
21 super(TestIpfixExporter, self).tearDown()
22 for i in self.pg_interfaces:
27 def find_exp_by_collector_addr(self, exporters, addr):
28 """Find the exporter in the list of exportes with the given addr"""
31 if exp.collector_address == IPv4Address(addr):
35 def verify_exporter_detail(
36 self, exp, collector_addr, src_addr, collector_port=4739, mtu=1400, interval=20
38 self.assertTrue(exp is not None)
39 self.assert_equal(exp.collector_address, collector_addr)
40 self.assert_equal(exp.src_address, src_addr)
41 self.assert_equal(exp.collector_port, collector_port)
42 self.assert_equal(exp.path_mtu, mtu)
43 self.assert_equal(exp.template_interval, interval)
45 def test_create_multipe_exporters(self):
46 """test that we can create and dump multiple exporters"""
52 # Old API - always gives us pool index 0.
53 self.vapi.set_ipfix_exporter(
54 collector_address=self.pg1.remote_ip4,
55 src_address=self.pg0.local_ip4,
58 template_interval=interval,
61 exporters = self.vapi.ipfix_exporter_dump()
62 exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4)
63 self.verify_exporter_detail(
64 exp, IPv4Address(self.pg1.remote_ip4), IPv4Address(self.pg0.local_ip4)
67 exporters = list(self.vapi.vpp.details_iter(self.vapi.ipfix_all_exporter_get))
68 exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4)
69 self.verify_exporter_detail(
70 exp, IPv4Address(self.pg1.remote_ip4), IPv4Address(self.pg0.local_ip4)
73 # create a 2nd exporter
74 self.vapi.ipfix_exporter_create_delete(
75 collector_address=self.pg2.remote_ip4,
76 src_address=self.pg0.local_ip4,
79 template_interval=interval,
83 exporters = list(self.vapi.vpp.details_iter(self.vapi.ipfix_all_exporter_get))
84 self.assertTrue(len(exporters) == 2)
85 exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4)
86 self.verify_exporter_detail(
87 exp, IPv4Address(self.pg1.remote_ip4), IPv4Address(self.pg0.local_ip4)
89 exp = self.find_exp_by_collector_addr(exporters, self.pg2.remote_ip4)
90 self.verify_exporter_detail(
91 exp, IPv4Address(self.pg2.remote_ip4), IPv4Address(self.pg0.local_ip4)
94 # Create a 3rd exporter
95 self.vapi.ipfix_exporter_create_delete(
96 collector_address=self.pg3.remote_ip4,
97 src_address=self.pg0.local_ip4,
100 template_interval=interval,
104 exporters = list(self.vapi.vpp.details_iter(self.vapi.ipfix_all_exporter_get))
105 self.assertTrue(len(exporters) == 3)
106 exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4)
107 self.verify_exporter_detail(
108 exp, IPv4Address(self.pg1.remote_ip4), IPv4Address(self.pg0.local_ip4)
110 exp = self.find_exp_by_collector_addr(exporters, self.pg2.remote_ip4)
111 self.verify_exporter_detail(
112 exp, IPv4Address(self.pg2.remote_ip4), IPv4Address(self.pg0.local_ip4)
114 exp = self.find_exp_by_collector_addr(exporters, self.pg3.remote_ip4)
115 self.verify_exporter_detail(
116 exp, IPv4Address(self.pg3.remote_ip4), IPv4Address(self.pg0.local_ip4)
119 # Modify the 2nd exporter.
120 self.vapi.ipfix_exporter_create_delete(
121 collector_address=self.pg2.remote_ip4,
122 src_address=self.pg0.local_ip4,
125 template_interval=interval + 1,
129 exporters = list(self.vapi.vpp.details_iter(self.vapi.ipfix_all_exporter_get))
130 self.assertTrue(len(exporters) == 3)
131 exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4)
132 self.verify_exporter_detail(
133 exp, IPv4Address(self.pg1.remote_ip4), IPv4Address(self.pg0.local_ip4)
135 exp = self.find_exp_by_collector_addr(exporters, self.pg2.remote_ip4)
136 self.verify_exporter_detail(
138 IPv4Address(self.pg2.remote_ip4),
139 IPv4Address(self.pg0.local_ip4),
141 interval=interval + 1,
143 exp = self.find_exp_by_collector_addr(exporters, self.pg3.remote_ip4)
144 self.verify_exporter_detail(
145 exp, IPv4Address(self.pg3.remote_ip4), IPv4Address(self.pg0.local_ip4)
148 # Delete 2nd exporter
149 self.vapi.ipfix_exporter_create_delete(
150 collector_address=self.pg2.remote_ip4,
151 src_address=self.pg0.local_ip4,
154 template_interval=interval,
158 exporters = list(self.vapi.vpp.details_iter(self.vapi.ipfix_all_exporter_get))
159 self.assertTrue(len(exporters) == 2)
160 exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4)
161 self.verify_exporter_detail(
162 exp, IPv4Address(self.pg1.remote_ip4), IPv4Address(self.pg0.local_ip4)
164 exp = self.find_exp_by_collector_addr(exporters, self.pg3.remote_ip4)
165 self.verify_exporter_detail(
166 exp, IPv4Address(self.pg3.remote_ip4), IPv4Address(self.pg0.local_ip4)
169 # Delete final exporter (exporter in slot 0 can not be deleted)
170 self.vapi.ipfix_exporter_create_delete(
171 collector_address=self.pg3.remote_ip4,
172 src_address=self.pg0.local_ip4,
175 template_interval=interval,
179 exporters = list(self.vapi.vpp.details_iter(self.vapi.ipfix_all_exporter_get))
180 self.assertTrue(len(exporters) == 1)
181 exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4)
182 self.verify_exporter_detail(
183 exp, IPv4Address(self.pg1.remote_ip4), IPv4Address(self.pg0.local_ip4)