2 from __future__ import print_function
10 from framework import VppTestCase, VppTestRunner, running_extended_tests
11 from vpp_object import VppObject
12 from vpp_pg_interface import CaptureTimeoutError
13 from vpp_ip_route import VppIpRoute, VppRoutePath
14 from ipaddress import ip_address, IPv4Address, IPv6Address
15 from socket import AF_INET, AF_INET6
18 class TestIpfixExporter(VppTestCase):
19 """ Ipfix Exporter Tests """
22 super(TestIpfixExporter, self).setUp()
23 self.create_pg_interfaces(range(4))
24 for i in self.pg_interfaces:
33 super(TestIpfixExporter, self).tearDown()
34 for i in self.pg_interfaces:
39 def find_exp_by_collector_addr(self, exporters, addr):
40 """ Find the exporter in the list of exportes with the given addr """
43 if exp.collector_address == IPv4Address(addr):
47 def verify_exporter_detail(self, exp, collector_addr, src_addr,
48 collector_port=4739, mtu=1400, interval=20):
49 self.assertTrue(exp is not None)
50 self.assert_equal(exp.collector_address, collector_addr)
51 self.assert_equal(exp.src_address, src_addr)
52 self.assert_equal(exp.collector_port, collector_port)
53 self.assert_equal(exp.path_mtu, mtu)
54 self.assert_equal(exp.template_interval, interval)
56 def test_create_multipe_exporters(self):
57 """ test that we can create and dump multiple exporters """
63 # Old API - always gives us pool index 0.
64 self.vapi.set_ipfix_exporter(
65 collector_address=self.pg1.remote_ip4,
66 src_address=self.pg0.local_ip4,
69 template_interval=interval)
71 exporters = self.vapi.ipfix_exporter_dump()
72 exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4)
73 self.verify_exporter_detail(exp,
74 IPv4Address(self.pg1.remote_ip4),
75 IPv4Address(self.pg0.local_ip4))
77 exporters = list(self.vapi.vpp.details_iter(
78 self.vapi.ipfix_all_exporter_get))
79 exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4)
80 self.verify_exporter_detail(exp,
81 IPv4Address(self.pg1.remote_ip4),
82 IPv4Address(self.pg0.local_ip4))
84 # create a 2nd exporter
85 self.vapi.ipfix_exporter_create_delete(
86 collector_address=self.pg2.remote_ip4,
87 src_address=self.pg0.local_ip4,
90 template_interval=interval,
93 exporters = list(self.vapi.vpp.details_iter(
94 self.vapi.ipfix_all_exporter_get))
95 self.assertTrue(len(exporters) == 2)
96 exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4)
97 self.verify_exporter_detail(exp,
98 IPv4Address(self.pg1.remote_ip4),
99 IPv4Address(self.pg0.local_ip4))
100 exp = self.find_exp_by_collector_addr(exporters, self.pg2.remote_ip4)
101 self.verify_exporter_detail(exp,
102 IPv4Address(self.pg2.remote_ip4),
103 IPv4Address(self.pg0.local_ip4))
105 # Create a 3rd exporter
106 self.vapi.ipfix_exporter_create_delete(
107 collector_address=self.pg3.remote_ip4,
108 src_address=self.pg0.local_ip4,
111 template_interval=interval,
114 exporters = list(self.vapi.vpp.details_iter(
115 self.vapi.ipfix_all_exporter_get))
116 self.assertTrue(len(exporters) == 3)
117 exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4)
118 self.verify_exporter_detail(exp,
119 IPv4Address(self.pg1.remote_ip4),
120 IPv4Address(self.pg0.local_ip4))
121 exp = self.find_exp_by_collector_addr(exporters, self.pg2.remote_ip4)
122 self.verify_exporter_detail(exp,
123 IPv4Address(self.pg2.remote_ip4),
124 IPv4Address(self.pg0.local_ip4))
125 exp = self.find_exp_by_collector_addr(exporters, self.pg3.remote_ip4)
126 self.verify_exporter_detail(exp,
127 IPv4Address(self.pg3.remote_ip4),
128 IPv4Address(self.pg0.local_ip4))
130 # Modify the 2nd exporter.
131 self.vapi.ipfix_exporter_create_delete(
132 collector_address=self.pg2.remote_ip4,
133 src_address=self.pg0.local_ip4,
136 template_interval=interval+1,
139 exporters = list(self.vapi.vpp.details_iter(
140 self.vapi.ipfix_all_exporter_get))
141 self.assertTrue(len(exporters) == 3)
142 exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4)
143 self.verify_exporter_detail(exp,
144 IPv4Address(self.pg1.remote_ip4),
145 IPv4Address(self.pg0.local_ip4))
146 exp = self.find_exp_by_collector_addr(exporters, self.pg2.remote_ip4)
147 self.verify_exporter_detail(exp,
148 IPv4Address(self.pg2.remote_ip4),
149 IPv4Address(self.pg0.local_ip4),
150 mtu=mtu+1, interval=interval+1)
151 exp = self.find_exp_by_collector_addr(exporters, self.pg3.remote_ip4)
152 self.verify_exporter_detail(exp,
153 IPv4Address(self.pg3.remote_ip4),
154 IPv4Address(self.pg0.local_ip4))
156 # Delete 2nd exporter
157 self.vapi.ipfix_exporter_create_delete(
158 collector_address=self.pg2.remote_ip4,
159 src_address=self.pg0.local_ip4,
162 template_interval=interval,
165 exporters = list(self.vapi.vpp.details_iter(
166 self.vapi.ipfix_all_exporter_get))
167 self.assertTrue(len(exporters) == 2)
168 exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4)
169 self.verify_exporter_detail(exp,
170 IPv4Address(self.pg1.remote_ip4),
171 IPv4Address(self.pg0.local_ip4))
172 exp = self.find_exp_by_collector_addr(exporters, self.pg3.remote_ip4)
173 self.verify_exporter_detail(exp,
174 IPv4Address(self.pg3.remote_ip4),
175 IPv4Address(self.pg0.local_ip4))
177 # Delete final exporter (exporter in slot 0 can not be deleted)
178 self.vapi.ipfix_exporter_create_delete(
179 collector_address=self.pg3.remote_ip4,
180 src_address=self.pg0.local_ip4,
183 template_interval=interval,
186 exporters = list(self.vapi.vpp.details_iter(
187 self.vapi.ipfix_all_exporter_get))
188 self.assertTrue(len(exporters) == 1)
189 exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4)
190 self.verify_exporter_detail(exp,
191 IPv4Address(self.pg1.remote_ip4),
192 IPv4Address(self.pg0.local_ip4))