2 from __future__ import print_function
10 from framework import VppTestCase
11 from vpp_object import VppObject
12 from vpp_pg_interface import CaptureTimeoutError
13 from vpp_ip_route import VppIpRoute, VppRoutePath
14 from ipaddress import ip_address, IPv4Address, IPv6Address
15 from socket import AF_INET, AF_INET6
18 class TestIpfixExporter(VppTestCase):
19 """Ipfix Exporter Tests"""
22 super(TestIpfixExporter, self).setUp()
23 self.create_pg_interfaces(range(4))
24 for i in self.pg_interfaces:
33 super(TestIpfixExporter, self).tearDown()
34 for i in self.pg_interfaces:
39 def find_exp_by_collector_addr(self, exporters, addr):
40 """Find the exporter in the list of exportes with the given addr"""
43 if exp.collector_address == IPv4Address(addr):
47 def verify_exporter_detail(
48 self, exp, collector_addr, src_addr, collector_port=4739, mtu=1400, interval=20
50 self.assertTrue(exp is not None)
51 self.assert_equal(exp.collector_address, collector_addr)
52 self.assert_equal(exp.src_address, src_addr)
53 self.assert_equal(exp.collector_port, collector_port)
54 self.assert_equal(exp.path_mtu, mtu)
55 self.assert_equal(exp.template_interval, interval)
57 def test_create_multipe_exporters(self):
58 """test that we can create and dump multiple exporters"""
64 # Old API - always gives us pool index 0.
65 self.vapi.set_ipfix_exporter(
66 collector_address=self.pg1.remote_ip4,
67 src_address=self.pg0.local_ip4,
70 template_interval=interval,
73 exporters = self.vapi.ipfix_exporter_dump()
74 exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4)
75 self.verify_exporter_detail(
76 exp, IPv4Address(self.pg1.remote_ip4), IPv4Address(self.pg0.local_ip4)
79 exporters = list(self.vapi.vpp.details_iter(self.vapi.ipfix_all_exporter_get))
80 exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4)
81 self.verify_exporter_detail(
82 exp, IPv4Address(self.pg1.remote_ip4), IPv4Address(self.pg0.local_ip4)
85 # create a 2nd exporter
86 self.vapi.ipfix_exporter_create_delete(
87 collector_address=self.pg2.remote_ip4,
88 src_address=self.pg0.local_ip4,
91 template_interval=interval,
95 exporters = list(self.vapi.vpp.details_iter(self.vapi.ipfix_all_exporter_get))
96 self.assertTrue(len(exporters) == 2)
97 exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4)
98 self.verify_exporter_detail(
99 exp, IPv4Address(self.pg1.remote_ip4), IPv4Address(self.pg0.local_ip4)
101 exp = self.find_exp_by_collector_addr(exporters, self.pg2.remote_ip4)
102 self.verify_exporter_detail(
103 exp, IPv4Address(self.pg2.remote_ip4), IPv4Address(self.pg0.local_ip4)
106 # Create a 3rd exporter
107 self.vapi.ipfix_exporter_create_delete(
108 collector_address=self.pg3.remote_ip4,
109 src_address=self.pg0.local_ip4,
112 template_interval=interval,
116 exporters = list(self.vapi.vpp.details_iter(self.vapi.ipfix_all_exporter_get))
117 self.assertTrue(len(exporters) == 3)
118 exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4)
119 self.verify_exporter_detail(
120 exp, IPv4Address(self.pg1.remote_ip4), IPv4Address(self.pg0.local_ip4)
122 exp = self.find_exp_by_collector_addr(exporters, self.pg2.remote_ip4)
123 self.verify_exporter_detail(
124 exp, IPv4Address(self.pg2.remote_ip4), IPv4Address(self.pg0.local_ip4)
126 exp = self.find_exp_by_collector_addr(exporters, self.pg3.remote_ip4)
127 self.verify_exporter_detail(
128 exp, IPv4Address(self.pg3.remote_ip4), IPv4Address(self.pg0.local_ip4)
131 # Modify the 2nd exporter.
132 self.vapi.ipfix_exporter_create_delete(
133 collector_address=self.pg2.remote_ip4,
134 src_address=self.pg0.local_ip4,
137 template_interval=interval + 1,
141 exporters = list(self.vapi.vpp.details_iter(self.vapi.ipfix_all_exporter_get))
142 self.assertTrue(len(exporters) == 3)
143 exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4)
144 self.verify_exporter_detail(
145 exp, IPv4Address(self.pg1.remote_ip4), IPv4Address(self.pg0.local_ip4)
147 exp = self.find_exp_by_collector_addr(exporters, self.pg2.remote_ip4)
148 self.verify_exporter_detail(
150 IPv4Address(self.pg2.remote_ip4),
151 IPv4Address(self.pg0.local_ip4),
153 interval=interval + 1,
155 exp = self.find_exp_by_collector_addr(exporters, self.pg3.remote_ip4)
156 self.verify_exporter_detail(
157 exp, IPv4Address(self.pg3.remote_ip4), IPv4Address(self.pg0.local_ip4)
160 # Delete 2nd exporter
161 self.vapi.ipfix_exporter_create_delete(
162 collector_address=self.pg2.remote_ip4,
163 src_address=self.pg0.local_ip4,
166 template_interval=interval,
170 exporters = list(self.vapi.vpp.details_iter(self.vapi.ipfix_all_exporter_get))
171 self.assertTrue(len(exporters) == 2)
172 exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4)
173 self.verify_exporter_detail(
174 exp, IPv4Address(self.pg1.remote_ip4), IPv4Address(self.pg0.local_ip4)
176 exp = self.find_exp_by_collector_addr(exporters, self.pg3.remote_ip4)
177 self.verify_exporter_detail(
178 exp, IPv4Address(self.pg3.remote_ip4), IPv4Address(self.pg0.local_ip4)
181 # Delete final exporter (exporter in slot 0 can not be deleted)
182 self.vapi.ipfix_exporter_create_delete(
183 collector_address=self.pg3.remote_ip4,
184 src_address=self.pg0.local_ip4,
187 template_interval=interval,
191 exporters = list(self.vapi.vpp.details_iter(self.vapi.ipfix_all_exporter_get))
192 self.assertTrue(len(exporters) == 1)
193 exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4)
194 self.verify_exporter_detail(
195 exp, IPv4Address(self.pg1.remote_ip4), IPv4Address(self.pg0.local_ip4)