tests: make tests less make dependent
[vpp.git] / test / test_ipfix_export.py
1 #!/usr/bin/env python3
2 from __future__ import print_function
3 import binascii
4 import random
5 import socket
6 import unittest
7 import time
8 import re
9
10 from framework import VppTestCase
11 from vpp_object import VppObject
12 from vpp_pg_interface import CaptureTimeoutError
13 from vpp_ip_route import VppIpRoute, VppRoutePath
14 from ipaddress import ip_address, IPv4Address, IPv6Address
15 from socket import AF_INET, AF_INET6
16
17
18 class TestIpfixExporter(VppTestCase):
19     """ Ipfix Exporter Tests """
20
21     def setUp(self):
22         super(TestIpfixExporter, self).setUp()
23         self.create_pg_interfaces(range(4))
24         for i in self.pg_interfaces:
25             i.admin_up()
26             i.config_ip4()
27             i.resolve_arp()
28             i.config_ip6()
29             i.resolve_ndp()
30             i.disable_ipv6_ra()
31
32     def tearDown(self):
33         super(TestIpfixExporter, self).tearDown()
34         for i in self.pg_interfaces:
35             i.unconfig_ip4()
36             i.unconfig_ip6()
37             i.admin_down()
38
39     def find_exp_by_collector_addr(self, exporters, addr):
40         """ Find the exporter in the list of exportes with the given  addr """
41
42         for exp in exporters:
43             if exp.collector_address == IPv4Address(addr):
44                 return exp
45         return None
46
47     def verify_exporter_detail(self, exp, collector_addr, src_addr,
48                                collector_port=4739, mtu=1400, interval=20):
49         self.assertTrue(exp is not None)
50         self.assert_equal(exp.collector_address, collector_addr)
51         self.assert_equal(exp.src_address, src_addr)
52         self.assert_equal(exp.collector_port, collector_port)
53         self.assert_equal(exp.path_mtu, mtu)
54         self.assert_equal(exp.template_interval, interval)
55
56     def test_create_multipe_exporters(self):
57         """ test that we can create and dump multiple exporters """
58
59         mtu = 1400
60         interval = 20
61         port = 4739
62
63         # Old API - always gives us pool index 0.
64         self.vapi.set_ipfix_exporter(
65             collector_address=self.pg1.remote_ip4,
66             src_address=self.pg0.local_ip4,
67             collector_port=4739,
68             path_mtu=mtu,
69             template_interval=interval)
70
71         exporters = self.vapi.ipfix_exporter_dump()
72         exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4)
73         self.verify_exporter_detail(exp,
74                                     IPv4Address(self.pg1.remote_ip4),
75                                     IPv4Address(self.pg0.local_ip4))
76
77         exporters = list(self.vapi.vpp.details_iter(
78             self.vapi.ipfix_all_exporter_get))
79         exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4)
80         self.verify_exporter_detail(exp,
81                                     IPv4Address(self.pg1.remote_ip4),
82                                     IPv4Address(self.pg0.local_ip4))
83
84         # create a 2nd exporter
85         self.vapi.ipfix_exporter_create_delete(
86             collector_address=self.pg2.remote_ip4,
87             src_address=self.pg0.local_ip4,
88             collector_port=4739,
89             path_mtu=mtu,
90             template_interval=interval,
91             is_create=True)
92
93         exporters = list(self.vapi.vpp.details_iter(
94             self.vapi.ipfix_all_exporter_get))
95         self.assertTrue(len(exporters) == 2)
96         exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4)
97         self.verify_exporter_detail(exp,
98                                     IPv4Address(self.pg1.remote_ip4),
99                                     IPv4Address(self.pg0.local_ip4))
100         exp = self.find_exp_by_collector_addr(exporters, self.pg2.remote_ip4)
101         self.verify_exporter_detail(exp,
102                                     IPv4Address(self.pg2.remote_ip4),
103                                     IPv4Address(self.pg0.local_ip4))
104
105         # Create a 3rd exporter
106         self.vapi.ipfix_exporter_create_delete(
107             collector_address=self.pg3.remote_ip4,
108             src_address=self.pg0.local_ip4,
109             collector_port=4739,
110             path_mtu=mtu,
111             template_interval=interval,
112             is_create=True)
113
114         exporters = list(self.vapi.vpp.details_iter(
115             self.vapi.ipfix_all_exporter_get))
116         self.assertTrue(len(exporters) == 3)
117         exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4)
118         self.verify_exporter_detail(exp,
119                                     IPv4Address(self.pg1.remote_ip4),
120                                     IPv4Address(self.pg0.local_ip4))
121         exp = self.find_exp_by_collector_addr(exporters, self.pg2.remote_ip4)
122         self.verify_exporter_detail(exp,
123                                     IPv4Address(self.pg2.remote_ip4),
124                                     IPv4Address(self.pg0.local_ip4))
125         exp = self.find_exp_by_collector_addr(exporters, self.pg3.remote_ip4)
126         self.verify_exporter_detail(exp,
127                                     IPv4Address(self.pg3.remote_ip4),
128                                     IPv4Address(self.pg0.local_ip4))
129
130         # Modify the 2nd exporter.
131         self.vapi.ipfix_exporter_create_delete(
132             collector_address=self.pg2.remote_ip4,
133             src_address=self.pg0.local_ip4,
134             collector_port=4739,
135             path_mtu=mtu+1,
136             template_interval=interval+1,
137             is_create=True)
138
139         exporters = list(self.vapi.vpp.details_iter(
140             self.vapi.ipfix_all_exporter_get))
141         self.assertTrue(len(exporters) == 3)
142         exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4)
143         self.verify_exporter_detail(exp,
144                                     IPv4Address(self.pg1.remote_ip4),
145                                     IPv4Address(self.pg0.local_ip4))
146         exp = self.find_exp_by_collector_addr(exporters, self.pg2.remote_ip4)
147         self.verify_exporter_detail(exp,
148                                     IPv4Address(self.pg2.remote_ip4),
149                                     IPv4Address(self.pg0.local_ip4),
150                                     mtu=mtu+1, interval=interval+1)
151         exp = self.find_exp_by_collector_addr(exporters, self.pg3.remote_ip4)
152         self.verify_exporter_detail(exp,
153                                     IPv4Address(self.pg3.remote_ip4),
154                                     IPv4Address(self.pg0.local_ip4))
155
156         # Delete 2nd exporter
157         self.vapi.ipfix_exporter_create_delete(
158             collector_address=self.pg2.remote_ip4,
159             src_address=self.pg0.local_ip4,
160             collector_port=4739,
161             path_mtu=mtu,
162             template_interval=interval,
163             is_create=False)
164
165         exporters = list(self.vapi.vpp.details_iter(
166             self.vapi.ipfix_all_exporter_get))
167         self.assertTrue(len(exporters) == 2)
168         exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4)
169         self.verify_exporter_detail(exp,
170                                     IPv4Address(self.pg1.remote_ip4),
171                                     IPv4Address(self.pg0.local_ip4))
172         exp = self.find_exp_by_collector_addr(exporters, self.pg3.remote_ip4)
173         self.verify_exporter_detail(exp,
174                                     IPv4Address(self.pg3.remote_ip4),
175                                     IPv4Address(self.pg0.local_ip4))
176
177         # Delete final exporter (exporter in slot 0 can not be deleted)
178         self.vapi.ipfix_exporter_create_delete(
179             collector_address=self.pg3.remote_ip4,
180             src_address=self.pg0.local_ip4,
181             collector_port=4739,
182             path_mtu=mtu,
183             template_interval=interval,
184             is_create=False)
185
186         exporters = list(self.vapi.vpp.details_iter(
187             self.vapi.ipfix_all_exporter_get))
188         self.assertTrue(len(exporters) == 1)
189         exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4)
190         self.verify_exporter_detail(exp,
191                                     IPv4Address(self.pg1.remote_ip4),
192                                     IPv4Address(self.pg0.local_ip4))