make test: improve handling of packet captures
[vpp.git] / test / test_bfd.py
1 #!/usr/bin/env python
2
3 import unittest
4 import time
5 from random import randint
6 from bfd import *
7 from framework import *
8 from util import ppp
9
10
11 class BFDAPITestCase(VppTestCase):
12     """Bidirectional Forwarding Detection (BFD) - API"""
13
14     @classmethod
15     def setUpClass(cls):
16         super(BFDAPITestCase, cls).setUpClass()
17
18         try:
19             cls.create_pg_interfaces([0])
20             cls.pg0.config_ip4()
21             cls.pg0.resolve_arp()
22
23         except Exception:
24             super(BFDAPITestCase, cls).tearDownClass()
25             raise
26
27     def test_add_bfd(self):
28         """ create a BFD session """
29         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
30         session.add_vpp_config()
31         self.logger.debug("Session state is %s" % str(session.state))
32         session.remove_vpp_config()
33         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
34         session.add_vpp_config()
35         self.logger.debug("Session state is %s" % str(session.state))
36         session.remove_vpp_config()
37
38     def test_double_add(self):
39         """ create the same BFD session twice (negative case) """
40         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
41         session.add_vpp_config()
42         try:
43             session.add_vpp_config()
44         except:
45             session.remove_vpp_config()
46             return
47         session.remove_vpp_config()
48         raise Exception("Expected failure while adding duplicate "
49                         "configuration")
50
51
52 def create_packet(interface, ttl=255, src_port=50000, **kwargs):
53     p = (Ether(src=interface.remote_mac, dst=interface.local_mac) /
54          IP(src=interface.remote_ip4, dst=interface.local_ip4, ttl=ttl) /
55          UDP(sport=src_port, dport=BFD.udp_dport) /
56          BFD(*kwargs))
57     return p
58
59
60 def verify_ip(test, packet, local_ip, remote_ip):
61     """ Verify correctness of IP layer. """
62     ip = packet[IP]
63     test.assert_equal(ip.src, local_ip, "IP source address")
64     test.assert_equal(ip.dst, remote_ip, "IP destination address")
65     test.assert_equal(ip.ttl, 255, "IP TTL")
66
67
68 def verify_udp(test, packet):
69     """ Verify correctness of UDP layer. """
70     udp = packet[UDP]
71     test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
72     test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
73                          "UDP source port")
74
75
76 class BFDTestSession(object):
77
78     def __init__(self, test, interface, detect_mult=3):
79         self.test = test
80         self.interface = interface
81         self.bfd_values = {
82             'my_discriminator': 0,
83             'desired_min_tx_interval': 100000,
84             'detect_mult': detect_mult,
85             'diag': BFDDiagCode.no_diagnostic,
86         }
87
88     def update(self, **kwargs):
89         self.bfd_values.update(kwargs)
90
91     def create_packet(self):
92         packet = create_packet(self.interface)
93         for name, value in self.bfd_values.iteritems():
94             packet[BFD].setfieldval(name, value)
95         return packet
96
97     def send_packet(self):
98         p = self.create_packet()
99         self.test.logger.debug(ppp("Sending packet:", p))
100         self.test.pg0.add_stream([p])
101         self.test.pg_start()
102
103     def verify_packet(self, packet):
104         """ Verify correctness of BFD layer. """
105         bfd = packet[BFD]
106         self.test.assert_equal(bfd.version, 1, "BFD version")
107         self.test.assert_equal(bfd.your_discriminator,
108                                self.bfd_values['my_discriminator'],
109                                "BFD - your discriminator")
110
111
112 class BFDTestCase(VppTestCase):
113     """Bidirectional Forwarding Detection (BFD)"""
114
115     @classmethod
116     def setUpClass(cls):
117         super(BFDTestCase, cls).setUpClass()
118         try:
119             cls.create_pg_interfaces([0])
120             cls.pg0.config_ip4()
121             cls.pg0.generate_remote_hosts()
122             cls.pg0.configure_ipv4_neighbors()
123             cls.pg0.admin_up()
124             cls.pg0.resolve_arp()
125
126         except Exception:
127             super(BFDTestCase, cls).tearDownClass()
128             raise
129
130     def setUp(self):
131         super(BFDTestCase, self).setUp()
132         self.vapi.want_bfd_events()
133         self.vpp_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
134         self.vpp_session.add_vpp_config()
135         self.vpp_session.admin_up()
136         self.test_session = BFDTestSession(self, self.pg0)
137
138     def tearDown(self):
139         self.vapi.want_bfd_events(enable_disable=0)
140         self.vapi.collect_events()  # clear the event queue
141         if not self.vpp_dead:
142             self.vpp_session.remove_vpp_config()
143         super(BFDTestCase, self).tearDown()
144
145     def verify_event(self, event, expected_state):
146         """ Verify correctness of event values. """
147         e = event
148         self.logger.debug("BFD: Event: %s" % repr(e))
149         self.assert_equal(e.bs_index, self.vpp_session.bs_index,
150                           "BFD session index")
151         self.assert_equal(e.sw_if_index, self.vpp_session.interface.sw_if_index,
152                           "BFD interface index")
153         is_ipv6 = 0
154         if self.vpp_session.af == AF_INET6:
155             is_ipv6 = 1
156         self.assert_equal(e.is_ipv6, is_ipv6, "is_ipv6")
157         if self.vpp_session.af == AF_INET:
158             self.assert_equal(e.local_addr[:4], self.vpp_session.local_addr_n,
159                               "Local IPv4 address")
160             self.assert_equal(e.peer_addr[:4], self.vpp_session.peer_addr_n,
161                               "Peer IPv4 address")
162         else:
163             self.assert_equal(e.local_addr, self.vpp_session.local_addr_n,
164                               "Local IPv6 address")
165             self.assert_equal(e.peer_addr, self.vpp_session.peer_addr_n,
166                               "Peer IPv6 address")
167         self.assert_equal(e.state, expected_state, BFDState)
168
169     def wait_for_bfd_packet(self, timeout=1):
170         self.logger.info("BFD: Waiting for BFD packet")
171         p = self.pg0.wait_for_packet(timeout=timeout)
172         bfd = p[BFD]
173         if bfd is None:
174             raise Exception(ppp("Unexpected or invalid BFD packet:", p))
175         if bfd.payload:
176             raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
177         verify_ip(self, p, self.pg0.local_ip4, self.pg0.remote_ip4)
178         verify_udp(self, p)
179         self.test_session.verify_packet(p)
180         return p
181
182     def test_slow_timer(self):
183         """ verify slow periodic control frames while session down """
184         self.pg_enable_capture([self.pg0])
185         expected_packets = 3
186         self.logger.info("BFD: Waiting for %d BFD packets" % expected_packets)
187         self.wait_for_bfd_packet(2)
188         for i in range(expected_packets):
189             before = time.time()
190             self.wait_for_bfd_packet(2)
191             after = time.time()
192             # spec says the range should be <0.75, 1>, allow extra 0.05 margin
193             # to work around timing issues
194             self.assert_in_range(
195                 after - before, 0.70, 1.05, "time between slow packets")
196             before = after
197
198     def test_zero_remote_min_rx(self):
199         """ no packets when zero BFD RemoteMinRxInterval """
200         self.pg_enable_capture([self.pg0])
201         p = self.wait_for_bfd_packet(2)
202         self.test_session.update(my_discriminator=randint(0, 40000000),
203                                  your_discriminator=p[BFD].my_discriminator,
204                                  state=BFDState.init,
205                                  required_min_rx_interval=0)
206         self.test_session.send_packet()
207         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
208         self.verify_event(e, expected_state=BFDState.up)
209
210         try:
211             p = self.pg0.wait_for_packet(timeout=1)
212         except:
213             return
214         raise Exception(ppp("Received unexpected BFD packet:", p))
215
216     def bfd_session_up(self):
217         self.pg_enable_capture([self.pg0])
218         self.logger.info("BFD: Waiting for slow hello")
219         p = self.wait_for_bfd_packet(2)
220         self.logger.info("BFD: Sending Init")
221         self.test_session.update(my_discriminator=randint(0, 40000000),
222                                  your_discriminator=p[BFD].my_discriminator,
223                                  state=BFDState.init,
224                                  required_min_rx_interval=100000)
225         self.test_session.send_packet()
226         self.logger.info("BFD: Waiting for event")
227         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
228         self.verify_event(e, expected_state=BFDState.up)
229         self.logger.info("BFD: Session is Up")
230         self.test_session.update(state=BFDState.up)
231
232     def test_session_up(self):
233         """ bring BFD session up """
234         self.bfd_session_up()
235
236     def test_hold_up(self):
237         """ hold BFD session up """
238         self.bfd_session_up()
239         for i in range(5):
240             self.wait_for_bfd_packet()
241             self.test_session.send_packet()
242
243     def test_conn_down(self):
244         """ verify session goes down after inactivity """
245         self.bfd_session_up()
246         self.wait_for_bfd_packet()
247         self.assert_equal(len(self.vapi.collect_events()), 0,
248                           "number of bfd events")
249         self.wait_for_bfd_packet()
250         self.assert_equal(len(self.vapi.collect_events()), 0,
251                           "number of bfd events")
252         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
253         self.verify_event(e, expected_state=BFDState.down)
254
255     def test_large_required_min_rx(self):
256         """ large remote RequiredMinRxInterval """
257         self.bfd_session_up()
258         interval = 3000000
259         self.test_session.update(required_min_rx_interval=interval)
260         self.test_session.send_packet()
261         now = time.time()
262         count = 0
263         while time.time() < now + interval / 1000000:
264             try:
265                 p = self.wait_for_bfd_packet()
266                 if count > 1:
267                     self.logger.error(ppp("Received unexpected packet:", p))
268                 count += 1
269             except:
270                 pass
271         self.assert_in_range(count, 0, 1, "number of packets received")
272
273
274 if __name__ == '__main__':
275     unittest.main(testRunner=VppTestRunner)