3afe94280257eda6957f8c9a138f54c58083b72e
[vpp.git] / test / test_bfd.py
1 #!/usr/bin/env python
2 """ BFD tests """
3
4 from __future__ import division
5 import unittest
6 import hashlib
7 import binascii
8 import time
9 from struct import pack, unpack
10 from random import randint, shuffle, getrandbits
11 from socket import AF_INET, AF_INET6, inet_ntop
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether
14 from scapy.layers.inet import UDP, IP
15 from scapy.layers.inet6 import IPv6
16 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
17     BFDDiagCode, BFDState, BFD_vpp_echo
18 from framework import VppTestCase, VppTestRunner, running_extended_tests
19 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
20 from vpp_lo_interface import VppLoInterface
21 from util import ppp
22 from vpp_papi_provider import UnexpectedApiReturnValueError
23 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
24
25 USEC_IN_SEC = 1000000
26
27
28 class AuthKeyFactory(object):
29     """Factory class for creating auth keys with unique conf key ID"""
30
31     def __init__(self):
32         self._conf_key_ids = {}
33
34     def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
35         """ create a random key with unique conf key id """
36         conf_key_id = randint(0, 0xFFFFFFFF)
37         while conf_key_id in self._conf_key_ids:
38             conf_key_id = randint(0, 0xFFFFFFFF)
39         self._conf_key_ids[conf_key_id] = 1
40         key = str(bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
41         return VppBFDAuthKey(test=test, auth_type=auth_type,
42                              conf_key_id=conf_key_id, key=key)
43
44
45 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
46 class BFDAPITestCase(VppTestCase):
47     """Bidirectional Forwarding Detection (BFD) - API"""
48
49     pg0 = None
50     pg1 = None
51
52     @classmethod
53     def setUpClass(cls):
54         super(BFDAPITestCase, cls).setUpClass()
55         cls.vapi.cli("set log class bfd level debug")
56         try:
57             cls.create_pg_interfaces(range(2))
58             for i in cls.pg_interfaces:
59                 i.config_ip4()
60                 i.config_ip6()
61                 i.resolve_arp()
62
63         except Exception:
64             super(BFDAPITestCase, cls).tearDownClass()
65             raise
66
67     def setUp(self):
68         super(BFDAPITestCase, self).setUp()
69         self.factory = AuthKeyFactory()
70
71     def test_add_bfd(self):
72         """ create a BFD session """
73         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
74         session.add_vpp_config()
75         self.logger.debug("Session state is %s", session.state)
76         session.remove_vpp_config()
77         session.add_vpp_config()
78         self.logger.debug("Session state is %s", session.state)
79         session.remove_vpp_config()
80
81     def test_double_add(self):
82         """ create the same BFD session twice (negative case) """
83         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
84         session.add_vpp_config()
85
86         with self.vapi.expect_negative_api_retval():
87             session.add_vpp_config()
88
89         session.remove_vpp_config()
90
91     def test_add_bfd6(self):
92         """ create IPv6 BFD session """
93         session = VppBFDUDPSession(
94             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
95         session.add_vpp_config()
96         self.logger.debug("Session state is %s", session.state)
97         session.remove_vpp_config()
98         session.add_vpp_config()
99         self.logger.debug("Session state is %s", session.state)
100         session.remove_vpp_config()
101
102     def test_mod_bfd(self):
103         """ modify BFD session parameters """
104         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
105                                    desired_min_tx=50000,
106                                    required_min_rx=10000,
107                                    detect_mult=1)
108         session.add_vpp_config()
109         s = session.get_bfd_udp_session_dump_entry()
110         self.assert_equal(session.desired_min_tx,
111                           s.desired_min_tx,
112                           "desired min transmit interval")
113         self.assert_equal(session.required_min_rx,
114                           s.required_min_rx,
115                           "required min receive interval")
116         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
117         session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
118                                   required_min_rx=session.required_min_rx * 2,
119                                   detect_mult=session.detect_mult * 2)
120         s = session.get_bfd_udp_session_dump_entry()
121         self.assert_equal(session.desired_min_tx,
122                           s.desired_min_tx,
123                           "desired min transmit interval")
124         self.assert_equal(session.required_min_rx,
125                           s.required_min_rx,
126                           "required min receive interval")
127         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
128
129     def test_add_sha1_keys(self):
130         """ add SHA1 keys """
131         key_count = 10
132         keys = [self.factory.create_random_key(
133             self) for i in range(0, key_count)]
134         for key in keys:
135             self.assertFalse(key.query_vpp_config())
136         for key in keys:
137             key.add_vpp_config()
138         for key in keys:
139             self.assertTrue(key.query_vpp_config())
140         # remove randomly
141         indexes = range(key_count)
142         shuffle(indexes)
143         removed = []
144         for i in indexes:
145             key = keys[i]
146             key.remove_vpp_config()
147             removed.append(i)
148             for j in range(key_count):
149                 key = keys[j]
150                 if j in removed:
151                     self.assertFalse(key.query_vpp_config())
152                 else:
153                     self.assertTrue(key.query_vpp_config())
154         # should be removed now
155         for key in keys:
156             self.assertFalse(key.query_vpp_config())
157         # add back and remove again
158         for key in keys:
159             key.add_vpp_config()
160         for key in keys:
161             self.assertTrue(key.query_vpp_config())
162         for key in keys:
163             key.remove_vpp_config()
164         for key in keys:
165             self.assertFalse(key.query_vpp_config())
166
167     def test_add_bfd_sha1(self):
168         """ create a BFD session (SHA1) """
169         key = self.factory.create_random_key(self)
170         key.add_vpp_config()
171         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
172                                    sha1_key=key)
173         session.add_vpp_config()
174         self.logger.debug("Session state is %s", session.state)
175         session.remove_vpp_config()
176         session.add_vpp_config()
177         self.logger.debug("Session state is %s", session.state)
178         session.remove_vpp_config()
179
180     def test_double_add_sha1(self):
181         """ create the same BFD session twice (negative case) (SHA1) """
182         key = self.factory.create_random_key(self)
183         key.add_vpp_config()
184         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
185                                    sha1_key=key)
186         session.add_vpp_config()
187         with self.assertRaises(Exception):
188             session.add_vpp_config()
189
190     def test_add_auth_nonexistent_key(self):
191         """ create BFD session using non-existent SHA1 (negative case) """
192         session = VppBFDUDPSession(
193             self, self.pg0, self.pg0.remote_ip4,
194             sha1_key=self.factory.create_random_key(self))
195         with self.assertRaises(Exception):
196             session.add_vpp_config()
197
198     def test_shared_sha1_key(self):
199         """ share single SHA1 key between multiple BFD sessions """
200         key = self.factory.create_random_key(self)
201         key.add_vpp_config()
202         sessions = [
203             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
204                              sha1_key=key),
205             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
206                              sha1_key=key, af=AF_INET6),
207             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
208                              sha1_key=key),
209             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
210                              sha1_key=key, af=AF_INET6)]
211         for s in sessions:
212             s.add_vpp_config()
213         removed = 0
214         for s in sessions:
215             e = key.get_bfd_auth_keys_dump_entry()
216             self.assert_equal(e.use_count, len(sessions) - removed,
217                               "Use count for shared key")
218             s.remove_vpp_config()
219             removed += 1
220         e = key.get_bfd_auth_keys_dump_entry()
221         self.assert_equal(e.use_count, len(sessions) - removed,
222                           "Use count for shared key")
223
224     def test_activate_auth(self):
225         """ activate SHA1 authentication """
226         key = self.factory.create_random_key(self)
227         key.add_vpp_config()
228         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
229         session.add_vpp_config()
230         session.activate_auth(key)
231
232     def test_deactivate_auth(self):
233         """ deactivate SHA1 authentication """
234         key = self.factory.create_random_key(self)
235         key.add_vpp_config()
236         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
237         session.add_vpp_config()
238         session.activate_auth(key)
239         session.deactivate_auth()
240
241     def test_change_key(self):
242         """ change SHA1 key """
243         key1 = self.factory.create_random_key(self)
244         key2 = self.factory.create_random_key(self)
245         while key2.conf_key_id == key1.conf_key_id:
246             key2 = self.factory.create_random_key(self)
247         key1.add_vpp_config()
248         key2.add_vpp_config()
249         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
250                                    sha1_key=key1)
251         session.add_vpp_config()
252         session.activate_auth(key2)
253
254
255 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
256 class BFDTestSession(object):
257     """ BFD session as seen from test framework side """
258
259     def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
260                  bfd_key_id=None, our_seq_number=None):
261         self.test = test
262         self.af = af
263         self.sha1_key = sha1_key
264         self.bfd_key_id = bfd_key_id
265         self.interface = interface
266         self.udp_sport = randint(49152, 65535)
267         if our_seq_number is None:
268             self.our_seq_number = randint(0, 40000000)
269         else:
270             self.our_seq_number = our_seq_number
271         self.vpp_seq_number = None
272         self.my_discriminator = 0
273         self.desired_min_tx = 300000
274         self.required_min_rx = 300000
275         self.required_min_echo_rx = None
276         self.detect_mult = detect_mult
277         self.diag = BFDDiagCode.no_diagnostic
278         self.your_discriminator = None
279         self.state = BFDState.down
280         self.auth_type = BFDAuthType.no_auth
281
282     def inc_seq_num(self):
283         """ increment sequence number, wrapping if needed """
284         if self.our_seq_number == 0xFFFFFFFF:
285             self.our_seq_number = 0
286         else:
287             self.our_seq_number += 1
288
289     def update(self, my_discriminator=None, your_discriminator=None,
290                desired_min_tx=None, required_min_rx=None,
291                required_min_echo_rx=None, detect_mult=None,
292                diag=None, state=None, auth_type=None):
293         """ update BFD parameters associated with session """
294         if my_discriminator is not None:
295             self.my_discriminator = my_discriminator
296         if your_discriminator is not None:
297             self.your_discriminator = your_discriminator
298         if required_min_rx is not None:
299             self.required_min_rx = required_min_rx
300         if required_min_echo_rx is not None:
301             self.required_min_echo_rx = required_min_echo_rx
302         if desired_min_tx is not None:
303             self.desired_min_tx = desired_min_tx
304         if detect_mult is not None:
305             self.detect_mult = detect_mult
306         if diag is not None:
307             self.diag = diag
308         if state is not None:
309             self.state = state
310         if auth_type is not None:
311             self.auth_type = auth_type
312
313     def fill_packet_fields(self, packet):
314         """ set packet fields with known values in packet """
315         bfd = packet[BFD]
316         if self.my_discriminator:
317             self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
318                                    self.my_discriminator)
319             bfd.my_discriminator = self.my_discriminator
320         if self.your_discriminator:
321             self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
322                                    self.your_discriminator)
323             bfd.your_discriminator = self.your_discriminator
324         if self.required_min_rx:
325             self.test.logger.debug(
326                 "BFD: setting packet.required_min_rx_interval=%s",
327                 self.required_min_rx)
328             bfd.required_min_rx_interval = self.required_min_rx
329         if self.required_min_echo_rx:
330             self.test.logger.debug(
331                 "BFD: setting packet.required_min_echo_rx=%s",
332                 self.required_min_echo_rx)
333             bfd.required_min_echo_rx_interval = self.required_min_echo_rx
334         if self.desired_min_tx:
335             self.test.logger.debug(
336                 "BFD: setting packet.desired_min_tx_interval=%s",
337                 self.desired_min_tx)
338             bfd.desired_min_tx_interval = self.desired_min_tx
339         if self.detect_mult:
340             self.test.logger.debug(
341                 "BFD: setting packet.detect_mult=%s", self.detect_mult)
342             bfd.detect_mult = self.detect_mult
343         if self.diag:
344             self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
345             bfd.diag = self.diag
346         if self.state:
347             self.test.logger.debug("BFD: setting packet.state=%s", self.state)
348             bfd.state = self.state
349         if self.auth_type:
350             # this is used by a negative test-case
351             self.test.logger.debug("BFD: setting packet.auth_type=%s",
352                                    self.auth_type)
353             bfd.auth_type = self.auth_type
354
355     def create_packet(self):
356         """ create a BFD packet, reflecting the current state of session """
357         if self.sha1_key:
358             bfd = BFD(flags="A")
359             bfd.auth_type = self.sha1_key.auth_type
360             bfd.auth_len = BFD.sha1_auth_len
361             bfd.auth_key_id = self.bfd_key_id
362             bfd.auth_seq_num = self.our_seq_number
363             bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
364         else:
365             bfd = BFD()
366         if self.af == AF_INET6:
367             packet = (Ether(src=self.interface.remote_mac,
368                             dst=self.interface.local_mac) /
369                       IPv6(src=self.interface.remote_ip6,
370                            dst=self.interface.local_ip6,
371                            hlim=255) /
372                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
373                       bfd)
374         else:
375             packet = (Ether(src=self.interface.remote_mac,
376                             dst=self.interface.local_mac) /
377                       IP(src=self.interface.remote_ip4,
378                          dst=self.interface.local_ip4,
379                          ttl=255) /
380                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
381                       bfd)
382         self.test.logger.debug("BFD: Creating packet")
383         self.fill_packet_fields(packet)
384         if self.sha1_key:
385             hash_material = str(packet[BFD])[:32] + self.sha1_key.key + \
386                 "\0" * (20 - len(self.sha1_key.key))
387             self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
388                                    hashlib.sha1(hash_material).hexdigest())
389             packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
390         return packet
391
392     def send_packet(self, packet=None, interface=None):
393         """ send packet on interface, creating the packet if needed """
394         if packet is None:
395             packet = self.create_packet()
396         if interface is None:
397             interface = self.test.pg0
398         self.test.logger.debug(ppp("Sending packet:", packet))
399         interface.add_stream(packet)
400         self.test.pg_start()
401
402     def verify_sha1_auth(self, packet):
403         """ Verify correctness of authentication in BFD layer. """
404         bfd = packet[BFD]
405         self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
406         self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
407                                BFDAuthType)
408         self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
409         self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
410         if self.vpp_seq_number is None:
411             self.vpp_seq_number = bfd.auth_seq_num
412             self.test.logger.debug("Received initial sequence number: %s" %
413                                    self.vpp_seq_number)
414         else:
415             recvd_seq_num = bfd.auth_seq_num
416             self.test.logger.debug("Received followup sequence number: %s" %
417                                    recvd_seq_num)
418             if self.vpp_seq_number < 0xffffffff:
419                 if self.sha1_key.auth_type == \
420                         BFDAuthType.meticulous_keyed_sha1:
421                     self.test.assert_equal(recvd_seq_num,
422                                            self.vpp_seq_number + 1,
423                                            "BFD sequence number")
424                 else:
425                     self.test.assert_in_range(recvd_seq_num,
426                                               self.vpp_seq_number,
427                                               self.vpp_seq_number + 1,
428                                               "BFD sequence number")
429             else:
430                 if self.sha1_key.auth_type == \
431                         BFDAuthType.meticulous_keyed_sha1:
432                     self.test.assert_equal(recvd_seq_num, 0,
433                                            "BFD sequence number")
434                 else:
435                     self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
436                                        "BFD sequence number not one of "
437                                        "(%s, 0)" % self.vpp_seq_number)
438             self.vpp_seq_number = recvd_seq_num
439         # last 20 bytes represent the hash - so replace them with the key,
440         # pad the result with zeros and hash the result
441         hash_material = bfd.original[:-20] + self.sha1_key.key + \
442             "\0" * (20 - len(self.sha1_key.key))
443         expected_hash = hashlib.sha1(hash_material).hexdigest()
444         self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
445                                expected_hash, "Auth key hash")
446
447     def verify_bfd(self, packet):
448         """ Verify correctness of BFD layer. """
449         bfd = packet[BFD]
450         self.test.assert_equal(bfd.version, 1, "BFD version")
451         self.test.assert_equal(bfd.your_discriminator,
452                                self.my_discriminator,
453                                "BFD - your discriminator")
454         if self.sha1_key:
455             self.verify_sha1_auth(packet)
456
457
458 def bfd_session_up(test):
459     """ Bring BFD session up """
460     test.logger.info("BFD: Waiting for slow hello")
461     p = wait_for_bfd_packet(test, 2)
462     old_offset = None
463     if hasattr(test, 'vpp_clock_offset'):
464         old_offset = test.vpp_clock_offset
465     test.vpp_clock_offset = time.time() - p.time
466     test.logger.debug("BFD: Calculated vpp clock offset: %s",
467                       test.vpp_clock_offset)
468     if old_offset:
469         test.assertAlmostEqual(
470             old_offset, test.vpp_clock_offset, delta=0.5,
471             msg="vpp clock offset not stable (new: %s, old: %s)" %
472             (test.vpp_clock_offset, old_offset))
473     test.logger.info("BFD: Sending Init")
474     test.test_session.update(my_discriminator=randint(0, 40000000),
475                              your_discriminator=p[BFD].my_discriminator,
476                              state=BFDState.init)
477     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
478             BFDAuthType.meticulous_keyed_sha1:
479         test.test_session.inc_seq_num()
480     test.test_session.send_packet()
481     test.logger.info("BFD: Waiting for event")
482     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
483     verify_event(test, e, expected_state=BFDState.up)
484     test.logger.info("BFD: Session is Up")
485     test.test_session.update(state=BFDState.up)
486     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
487             BFDAuthType.meticulous_keyed_sha1:
488         test.test_session.inc_seq_num()
489     test.test_session.send_packet()
490     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
491
492
493 def bfd_session_down(test):
494     """ Bring BFD session down """
495     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
496     test.test_session.update(state=BFDState.down)
497     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
498             BFDAuthType.meticulous_keyed_sha1:
499         test.test_session.inc_seq_num()
500     test.test_session.send_packet()
501     test.logger.info("BFD: Waiting for event")
502     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
503     verify_event(test, e, expected_state=BFDState.down)
504     test.logger.info("BFD: Session is Down")
505     test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
506
507
508 def verify_bfd_session_config(test, session, state=None):
509     dump = session.get_bfd_udp_session_dump_entry()
510     test.assertIsNotNone(dump)
511     # since dump is not none, we have verified that sw_if_index and addresses
512     # are valid (in get_bfd_udp_session_dump_entry)
513     if state:
514         test.assert_equal(dump.state, state, "session state")
515     test.assert_equal(dump.required_min_rx, session.required_min_rx,
516                       "required min rx interval")
517     test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
518                       "desired min tx interval")
519     test.assert_equal(dump.detect_mult, session.detect_mult,
520                       "detect multiplier")
521     if session.sha1_key is None:
522         test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
523     else:
524         test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
525         test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
526                           "bfd key id")
527         test.assert_equal(dump.conf_key_id,
528                           session.sha1_key.conf_key_id,
529                           "config key id")
530
531
532 def verify_ip(test, packet):
533     """ Verify correctness of IP layer. """
534     if test.vpp_session.af == AF_INET6:
535         ip = packet[IPv6]
536         local_ip = test.pg0.local_ip6
537         remote_ip = test.pg0.remote_ip6
538         test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
539     else:
540         ip = packet[IP]
541         local_ip = test.pg0.local_ip4
542         remote_ip = test.pg0.remote_ip4
543         test.assert_equal(ip.ttl, 255, "IPv4 TTL")
544     test.assert_equal(ip.src, local_ip, "IP source address")
545     test.assert_equal(ip.dst, remote_ip, "IP destination address")
546
547
548 def verify_udp(test, packet):
549     """ Verify correctness of UDP layer. """
550     udp = packet[UDP]
551     test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
552     test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
553                          "UDP source port")
554
555
556 def verify_event(test, event, expected_state):
557     """ Verify correctness of event values. """
558     e = event
559     test.logger.debug("BFD: Event: %s" % repr(e))
560     test.assert_equal(e.sw_if_index,
561                       test.vpp_session.interface.sw_if_index,
562                       "BFD interface index")
563     is_ipv6 = 0
564     if test.vpp_session.af == AF_INET6:
565         is_ipv6 = 1
566     test.assert_equal(e.is_ipv6, is_ipv6, "is_ipv6")
567     if test.vpp_session.af == AF_INET:
568         test.assert_equal(e.local_addr[:4], test.vpp_session.local_addr_n,
569                           "Local IPv4 address")
570         test.assert_equal(e.peer_addr[:4], test.vpp_session.peer_addr_n,
571                           "Peer IPv4 address")
572     else:
573         test.assert_equal(e.local_addr, test.vpp_session.local_addr_n,
574                           "Local IPv6 address")
575         test.assert_equal(e.peer_addr, test.vpp_session.peer_addr_n,
576                           "Peer IPv6 address")
577     test.assert_equal(e.state, expected_state, BFDState)
578
579
580 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None):
581     """ wait for BFD packet and verify its correctness
582
583     :param timeout: how long to wait
584     :param pcap_time_min: ignore packets with pcap timestamp lower than this
585
586     :returns: tuple (packet, time spent waiting for packet)
587     """
588     test.logger.info("BFD: Waiting for BFD packet")
589     deadline = time.time() + timeout
590     counter = 0
591     while True:
592         counter += 1
593         # sanity check
594         test.assert_in_range(counter, 0, 100, "number of packets ignored")
595         time_left = deadline - time.time()
596         if time_left < 0:
597             raise CaptureTimeoutError("Packet did not arrive within timeout")
598         p = test.pg0.wait_for_packet(timeout=time_left)
599         test.logger.debug(ppp("BFD: Got packet:", p))
600         if pcap_time_min is not None and p.time < pcap_time_min:
601             test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
602                                   "pcap time min %s):" %
603                                   (p.time, pcap_time_min), p))
604         else:
605             break
606     bfd = p[BFD]
607     if bfd is None:
608         raise Exception(ppp("Unexpected or invalid BFD packet:", p))
609     if bfd.payload:
610         raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
611     verify_ip(test, p)
612     verify_udp(test, p)
613     test.test_session.verify_bfd(p)
614     return p
615
616
617 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
618 class BFD4TestCase(VppTestCase):
619     """Bidirectional Forwarding Detection (BFD)"""
620
621     pg0 = None
622     vpp_clock_offset = None
623     vpp_session = None
624     test_session = None
625
626     @classmethod
627     def setUpClass(cls):
628         super(BFD4TestCase, cls).setUpClass()
629         cls.vapi.cli("set log class bfd level debug")
630         try:
631             cls.create_pg_interfaces([0])
632             cls.create_loopback_interfaces(1)
633             cls.loopback0 = cls.lo_interfaces[0]
634             cls.loopback0.config_ip4()
635             cls.loopback0.admin_up()
636             cls.pg0.config_ip4()
637             cls.pg0.configure_ipv4_neighbors()
638             cls.pg0.admin_up()
639             cls.pg0.resolve_arp()
640
641         except Exception:
642             super(BFD4TestCase, cls).tearDownClass()
643             raise
644
645     def setUp(self):
646         super(BFD4TestCase, self).setUp()
647         self.factory = AuthKeyFactory()
648         self.vapi.want_bfd_events()
649         self.pg0.enable_capture()
650         try:
651             self.vpp_session = VppBFDUDPSession(self, self.pg0,
652                                                 self.pg0.remote_ip4)
653             self.vpp_session.add_vpp_config()
654             self.vpp_session.admin_up()
655             self.test_session = BFDTestSession(self, self.pg0, AF_INET)
656         except:
657             self.vapi.want_bfd_events(enable_disable=0)
658             raise
659
660     def tearDown(self):
661         if not self.vpp_dead:
662             self.vapi.want_bfd_events(enable_disable=0)
663         self.vapi.collect_events()  # clear the event queue
664         super(BFD4TestCase, self).tearDown()
665
666     def test_session_up(self):
667         """ bring BFD session up """
668         bfd_session_up(self)
669
670     def test_session_up_by_ip(self):
671         """ bring BFD session up - first frame looked up by address pair """
672         self.logger.info("BFD: Sending Slow control frame")
673         self.test_session.update(my_discriminator=randint(0, 40000000))
674         self.test_session.send_packet()
675         self.pg0.enable_capture()
676         p = self.pg0.wait_for_packet(1)
677         self.assert_equal(p[BFD].your_discriminator,
678                           self.test_session.my_discriminator,
679                           "BFD - your discriminator")
680         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
681         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
682                                  state=BFDState.up)
683         self.logger.info("BFD: Waiting for event")
684         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
685         verify_event(self, e, expected_state=BFDState.init)
686         self.logger.info("BFD: Sending Up")
687         self.test_session.send_packet()
688         self.logger.info("BFD: Waiting for event")
689         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
690         verify_event(self, e, expected_state=BFDState.up)
691         self.logger.info("BFD: Session is Up")
692         self.test_session.update(state=BFDState.up)
693         self.test_session.send_packet()
694         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
695
696     def test_session_down(self):
697         """ bring BFD session down """
698         bfd_session_up(self)
699         bfd_session_down(self)
700
701     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
702     def test_hold_up(self):
703         """ hold BFD session up """
704         bfd_session_up(self)
705         for dummy in range(self.test_session.detect_mult * 2):
706             wait_for_bfd_packet(self)
707             self.test_session.send_packet()
708         self.assert_equal(len(self.vapi.collect_events()), 0,
709                           "number of bfd events")
710
711     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
712     def test_slow_timer(self):
713         """ verify slow periodic control frames while session down """
714         packet_count = 3
715         self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
716         prev_packet = wait_for_bfd_packet(self, 2)
717         for dummy in range(packet_count):
718             next_packet = wait_for_bfd_packet(self, 2)
719             time_diff = next_packet.time - prev_packet.time
720             # spec says the range should be <0.75, 1>, allow extra 0.05 margin
721             # to work around timing issues
722             self.assert_in_range(
723                 time_diff, 0.70, 1.05, "time between slow packets")
724             prev_packet = next_packet
725
726     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
727     def test_zero_remote_min_rx(self):
728         """ no packets when zero remote required min rx interval """
729         bfd_session_up(self)
730         self.test_session.update(required_min_rx=0)
731         self.test_session.send_packet()
732         for dummy in range(self.test_session.detect_mult):
733             self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
734                        "sleep before transmitting bfd packet")
735             self.test_session.send_packet()
736             try:
737                 p = wait_for_bfd_packet(self, timeout=0)
738                 self.logger.error(ppp("Received unexpected packet:", p))
739             except CaptureTimeoutError:
740                 pass
741         self.assert_equal(
742             len(self.vapi.collect_events()), 0, "number of bfd events")
743         self.test_session.update(required_min_rx=300000)
744         for dummy in range(3):
745             self.test_session.send_packet()
746             wait_for_bfd_packet(
747                 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
748         self.assert_equal(
749             len(self.vapi.collect_events()), 0, "number of bfd events")
750
751     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
752     def test_conn_down(self):
753         """ verify session goes down after inactivity """
754         bfd_session_up(self)
755         detection_time = self.test_session.detect_mult *\
756             self.vpp_session.required_min_rx / USEC_IN_SEC
757         self.sleep(detection_time, "waiting for BFD session time-out")
758         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
759         verify_event(self, e, expected_state=BFDState.down)
760
761     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
762     def test_large_required_min_rx(self):
763         """ large remote required min rx interval """
764         bfd_session_up(self)
765         p = wait_for_bfd_packet(self)
766         interval = 3000000
767         self.test_session.update(required_min_rx=interval)
768         self.test_session.send_packet()
769         time_mark = time.time()
770         count = 0
771         # busy wait here, trying to collect a packet or event, vpp is not
772         # allowed to send packets and the session will timeout first - so the
773         # Up->Down event must arrive before any packets do
774         while time.time() < time_mark + interval / USEC_IN_SEC:
775             try:
776                 p = wait_for_bfd_packet(self, timeout=0)
777                 # if vpp managed to send a packet before we did the session
778                 # session update, then that's fine, ignore it
779                 if p.time < time_mark - self.vpp_clock_offset:
780                     continue
781                 self.logger.error(ppp("Received unexpected packet:", p))
782                 count += 1
783             except CaptureTimeoutError:
784                 pass
785             events = self.vapi.collect_events()
786             if len(events) > 0:
787                 verify_event(self, events[0], BFDState.down)
788                 break
789         self.assert_equal(count, 0, "number of packets received")
790
791     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
792     def test_immediate_remote_min_rx_reduction(self):
793         """ immediately honor remote required min rx reduction """
794         self.vpp_session.remove_vpp_config()
795         self.vpp_session = VppBFDUDPSession(
796             self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
797         self.pg0.enable_capture()
798         self.vpp_session.add_vpp_config()
799         self.test_session.update(desired_min_tx=1000000,
800                                  required_min_rx=1000000)
801         bfd_session_up(self)
802         reference_packet = wait_for_bfd_packet(self)
803         time_mark = time.time()
804         interval = 300000
805         self.test_session.update(required_min_rx=interval)
806         self.test_session.send_packet()
807         extra_time = time.time() - time_mark
808         p = wait_for_bfd_packet(self)
809         # first packet is allowed to be late by time we spent doing the update
810         # calculated in extra_time
811         self.assert_in_range(p.time - reference_packet.time,
812                              .95 * 0.75 * interval / USEC_IN_SEC,
813                              1.05 * interval / USEC_IN_SEC + extra_time,
814                              "time between BFD packets")
815         reference_packet = p
816         for dummy in range(3):
817             p = wait_for_bfd_packet(self)
818             diff = p.time - reference_packet.time
819             self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
820                                  1.05 * interval / USEC_IN_SEC,
821                                  "time between BFD packets")
822             reference_packet = p
823
824     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
825     def test_modify_req_min_rx_double(self):
826         """ modify session - double required min rx """
827         bfd_session_up(self)
828         p = wait_for_bfd_packet(self)
829         self.test_session.update(desired_min_tx=10000,
830                                  required_min_rx=10000)
831         self.test_session.send_packet()
832         # double required min rx
833         self.vpp_session.modify_parameters(
834             required_min_rx=2 * self.vpp_session.required_min_rx)
835         p = wait_for_bfd_packet(
836             self, pcap_time_min=time.time() - self.vpp_clock_offset)
837         # poll bit needs to be set
838         self.assertIn("P", p.sprintf("%BFD.flags%"),
839                       "Poll bit not set in BFD packet")
840         # finish poll sequence with final packet
841         final = self.test_session.create_packet()
842         final[BFD].flags = "F"
843         timeout = self.test_session.detect_mult * \
844             max(self.test_session.desired_min_tx,
845                 self.vpp_session.required_min_rx) / USEC_IN_SEC
846         self.test_session.send_packet(final)
847         time_mark = time.time()
848         e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
849         verify_event(self, e, expected_state=BFDState.down)
850         time_to_event = time.time() - time_mark
851         self.assert_in_range(time_to_event, .9 * timeout,
852                              1.1 * timeout, "session timeout")
853
854     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
855     def test_modify_req_min_rx_halve(self):
856         """ modify session - halve required min rx """
857         self.vpp_session.modify_parameters(
858             required_min_rx=2 * self.vpp_session.required_min_rx)
859         bfd_session_up(self)
860         p = wait_for_bfd_packet(self)
861         self.test_session.update(desired_min_tx=10000,
862                                  required_min_rx=10000)
863         self.test_session.send_packet()
864         p = wait_for_bfd_packet(
865             self, pcap_time_min=time.time() - self.vpp_clock_offset)
866         # halve required min rx
867         old_required_min_rx = self.vpp_session.required_min_rx
868         self.vpp_session.modify_parameters(
869             required_min_rx=0.5 * self.vpp_session.required_min_rx)
870         # now we wait 0.8*3*old-req-min-rx and the session should still be up
871         self.sleep(0.8 * self.vpp_session.detect_mult *
872                    old_required_min_rx / USEC_IN_SEC,
873                    "wait before finishing poll sequence")
874         self.assert_equal(len(self.vapi.collect_events()), 0,
875                           "number of bfd events")
876         p = wait_for_bfd_packet(self)
877         # poll bit needs to be set
878         self.assertIn("P", p.sprintf("%BFD.flags%"),
879                       "Poll bit not set in BFD packet")
880         # finish poll sequence with final packet
881         final = self.test_session.create_packet()
882         final[BFD].flags = "F"
883         self.test_session.send_packet(final)
884         # now the session should time out under new conditions
885         detection_time = self.test_session.detect_mult *\
886             self.vpp_session.required_min_rx / USEC_IN_SEC
887         before = time.time()
888         e = self.vapi.wait_for_event(
889             2 * detection_time, "bfd_udp_session_details")
890         after = time.time()
891         self.assert_in_range(after - before,
892                              0.9 * detection_time,
893                              1.1 * detection_time,
894                              "time before bfd session goes down")
895         verify_event(self, e, expected_state=BFDState.down)
896
897     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
898     def test_modify_detect_mult(self):
899         """ modify detect multiplier """
900         bfd_session_up(self)
901         p = wait_for_bfd_packet(self)
902         self.vpp_session.modify_parameters(detect_mult=1)
903         p = wait_for_bfd_packet(
904             self, pcap_time_min=time.time() - self.vpp_clock_offset)
905         self.assert_equal(self.vpp_session.detect_mult,
906                           p[BFD].detect_mult,
907                           "detect mult")
908         # poll bit must not be set
909         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
910                          "Poll bit not set in BFD packet")
911         self.vpp_session.modify_parameters(detect_mult=10)
912         p = wait_for_bfd_packet(
913             self, pcap_time_min=time.time() - self.vpp_clock_offset)
914         self.assert_equal(self.vpp_session.detect_mult,
915                           p[BFD].detect_mult,
916                           "detect mult")
917         # poll bit must not be set
918         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
919                          "Poll bit not set in BFD packet")
920
921     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
922     def test_queued_poll(self):
923         """ test poll sequence queueing """
924         bfd_session_up(self)
925         p = wait_for_bfd_packet(self)
926         self.vpp_session.modify_parameters(
927             required_min_rx=2 * self.vpp_session.required_min_rx)
928         p = wait_for_bfd_packet(self)
929         poll_sequence_start = time.time()
930         poll_sequence_length_min = 0.5
931         send_final_after = time.time() + poll_sequence_length_min
932         # poll bit needs to be set
933         self.assertIn("P", p.sprintf("%BFD.flags%"),
934                       "Poll bit not set in BFD packet")
935         self.assert_equal(p[BFD].required_min_rx_interval,
936                           self.vpp_session.required_min_rx,
937                           "BFD required min rx interval")
938         self.vpp_session.modify_parameters(
939             required_min_rx=2 * self.vpp_session.required_min_rx)
940         # 2nd poll sequence should be queued now
941         # don't send the reply back yet, wait for some time to emulate
942         # longer round-trip time
943         packet_count = 0
944         while time.time() < send_final_after:
945             self.test_session.send_packet()
946             p = wait_for_bfd_packet(self)
947             self.assert_equal(len(self.vapi.collect_events()), 0,
948                               "number of bfd events")
949             self.assert_equal(p[BFD].required_min_rx_interval,
950                               self.vpp_session.required_min_rx,
951                               "BFD required min rx interval")
952             packet_count += 1
953             # poll bit must be set
954             self.assertIn("P", p.sprintf("%BFD.flags%"),
955                           "Poll bit not set in BFD packet")
956         final = self.test_session.create_packet()
957         final[BFD].flags = "F"
958         self.test_session.send_packet(final)
959         # finish 1st with final
960         poll_sequence_length = time.time() - poll_sequence_start
961         # vpp must wait for some time before starting new poll sequence
962         poll_no_2_started = False
963         for dummy in range(2 * packet_count):
964             p = wait_for_bfd_packet(self)
965             self.assert_equal(len(self.vapi.collect_events()), 0,
966                               "number of bfd events")
967             if "P" in p.sprintf("%BFD.flags%"):
968                 poll_no_2_started = True
969                 if time.time() < poll_sequence_start + poll_sequence_length:
970                     raise Exception("VPP started 2nd poll sequence too soon")
971                 final = self.test_session.create_packet()
972                 final[BFD].flags = "F"
973                 self.test_session.send_packet(final)
974                 break
975             else:
976                 self.test_session.send_packet()
977         self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
978         # finish 2nd with final
979         final = self.test_session.create_packet()
980         final[BFD].flags = "F"
981         self.test_session.send_packet(final)
982         p = wait_for_bfd_packet(self)
983         # poll bit must not be set
984         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
985                          "Poll bit set in BFD packet")
986
987     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
988     def test_poll_response(self):
989         """ test correct response to control frame with poll bit set """
990         bfd_session_up(self)
991         poll = self.test_session.create_packet()
992         poll[BFD].flags = "P"
993         self.test_session.send_packet(poll)
994         final = wait_for_bfd_packet(
995             self, pcap_time_min=time.time() - self.vpp_clock_offset)
996         self.assertIn("F", final.sprintf("%BFD.flags%"))
997
998     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
999     def test_no_periodic_if_remote_demand(self):
1000         """ no periodic frames outside poll sequence if remote demand set """
1001         bfd_session_up(self)
1002         demand = self.test_session.create_packet()
1003         demand[BFD].flags = "D"
1004         self.test_session.send_packet(demand)
1005         transmit_time = 0.9 \
1006             * max(self.vpp_session.required_min_rx,
1007                   self.test_session.desired_min_tx) \
1008             / USEC_IN_SEC
1009         count = 0
1010         for dummy in range(self.test_session.detect_mult * 2):
1011             time.sleep(transmit_time)
1012             self.test_session.send_packet(demand)
1013             try:
1014                 p = wait_for_bfd_packet(self, timeout=0)
1015                 self.logger.error(ppp("Received unexpected packet:", p))
1016                 count += 1
1017             except CaptureTimeoutError:
1018                 pass
1019         events = self.vapi.collect_events()
1020         for e in events:
1021             self.logger.error("Received unexpected event: %s", e)
1022         self.assert_equal(count, 0, "number of packets received")
1023         self.assert_equal(len(events), 0, "number of events received")
1024
1025     def test_echo_looped_back(self):
1026         """ echo packets looped back """
1027         # don't need a session in this case..
1028         self.vpp_session.remove_vpp_config()
1029         self.pg0.enable_capture()
1030         echo_packet_count = 10
1031         # random source port low enough to increment a few times..
1032         udp_sport_tx = randint(1, 50000)
1033         udp_sport_rx = udp_sport_tx
1034         echo_packet = (Ether(src=self.pg0.remote_mac,
1035                              dst=self.pg0.local_mac) /
1036                        IP(src=self.pg0.remote_ip4,
1037                           dst=self.pg0.remote_ip4) /
1038                        UDP(dport=BFD.udp_dport_echo) /
1039                        Raw("this should be looped back"))
1040         for dummy in range(echo_packet_count):
1041             self.sleep(.01, "delay between echo packets")
1042             echo_packet[UDP].sport = udp_sport_tx
1043             udp_sport_tx += 1
1044             self.logger.debug(ppp("Sending packet:", echo_packet))
1045             self.pg0.add_stream(echo_packet)
1046             self.pg_start()
1047         for dummy in range(echo_packet_count):
1048             p = self.pg0.wait_for_packet(1)
1049             self.logger.debug(ppp("Got packet:", p))
1050             ether = p[Ether]
1051             self.assert_equal(self.pg0.remote_mac,
1052                               ether.dst, "Destination MAC")
1053             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1054             ip = p[IP]
1055             self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1056             self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1057             udp = p[UDP]
1058             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1059                               "UDP destination port")
1060             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1061             udp_sport_rx += 1
1062             # need to compare the hex payload here, otherwise BFD_vpp_echo
1063             # gets in way
1064             self.assertEqual(str(p[UDP].payload),
1065                              str(echo_packet[UDP].payload),
1066                              "Received packet is not the echo packet sent")
1067         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1068                           "ECHO packet identifier for test purposes)")
1069
1070     def test_echo(self):
1071         """ echo function """
1072         bfd_session_up(self)
1073         self.test_session.update(required_min_echo_rx=150000)
1074         self.test_session.send_packet()
1075         detection_time = self.test_session.detect_mult *\
1076             self.vpp_session.required_min_rx / USEC_IN_SEC
1077         # echo shouldn't work without echo source set
1078         for dummy in range(10):
1079             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1080             self.sleep(sleep, "delay before sending bfd packet")
1081             self.test_session.send_packet()
1082         p = wait_for_bfd_packet(
1083             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1084         self.assert_equal(p[BFD].required_min_rx_interval,
1085                           self.vpp_session.required_min_rx,
1086                           "BFD required min rx interval")
1087         self.test_session.send_packet()
1088         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1089         echo_seen = False
1090         # should be turned on - loopback echo packets
1091         for dummy in range(3):
1092             loop_until = time.time() + 0.75 * detection_time
1093             while time.time() < loop_until:
1094                 p = self.pg0.wait_for_packet(1)
1095                 self.logger.debug(ppp("Got packet:", p))
1096                 if p[UDP].dport == BFD.udp_dport_echo:
1097                     self.assert_equal(
1098                         p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1099                     self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1100                                         "BFD ECHO src IP equal to loopback IP")
1101                     self.logger.debug(ppp("Looping back packet:", p))
1102                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1103                                       "ECHO packet destination MAC address")
1104                     p[Ether].dst = self.pg0.local_mac
1105                     self.pg0.add_stream(p)
1106                     self.pg_start()
1107                     echo_seen = True
1108                 elif p.haslayer(BFD):
1109                     if echo_seen:
1110                         self.assertGreaterEqual(
1111                             p[BFD].required_min_rx_interval,
1112                             1000000)
1113                     if "P" in p.sprintf("%BFD.flags%"):
1114                         final = self.test_session.create_packet()
1115                         final[BFD].flags = "F"
1116                         self.test_session.send_packet(final)
1117                 else:
1118                     raise Exception(ppp("Received unknown packet:", p))
1119
1120                 self.assert_equal(len(self.vapi.collect_events()), 0,
1121                                   "number of bfd events")
1122             self.test_session.send_packet()
1123         self.assertTrue(echo_seen, "No echo packets received")
1124
1125     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1126     def test_echo_fail(self):
1127         """ session goes down if echo function fails """
1128         bfd_session_up(self)
1129         self.test_session.update(required_min_echo_rx=150000)
1130         self.test_session.send_packet()
1131         detection_time = self.test_session.detect_mult *\
1132             self.vpp_session.required_min_rx / USEC_IN_SEC
1133         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1134         # echo function should be used now, but we will drop the echo packets
1135         verified_diag = False
1136         for dummy in range(3):
1137             loop_until = time.time() + 0.75 * detection_time
1138             while time.time() < loop_until:
1139                 p = self.pg0.wait_for_packet(1)
1140                 self.logger.debug(ppp("Got packet:", p))
1141                 if p[UDP].dport == BFD.udp_dport_echo:
1142                     # dropped
1143                     pass
1144                 elif p.haslayer(BFD):
1145                     if "P" in p.sprintf("%BFD.flags%"):
1146                         self.assertGreaterEqual(
1147                             p[BFD].required_min_rx_interval,
1148                             1000000)
1149                         final = self.test_session.create_packet()
1150                         final[BFD].flags = "F"
1151                         self.test_session.send_packet(final)
1152                     if p[BFD].state == BFDState.down:
1153                         self.assert_equal(p[BFD].diag,
1154                                           BFDDiagCode.echo_function_failed,
1155                                           BFDDiagCode)
1156                         verified_diag = True
1157                 else:
1158                     raise Exception(ppp("Received unknown packet:", p))
1159             self.test_session.send_packet()
1160         events = self.vapi.collect_events()
1161         self.assert_equal(len(events), 1, "number of bfd events")
1162         self.assert_equal(events[0].state, BFDState.down, BFDState)
1163         self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1164
1165     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1166     def test_echo_stop(self):
1167         """ echo function stops if peer sets required min echo rx zero """
1168         bfd_session_up(self)
1169         self.test_session.update(required_min_echo_rx=150000)
1170         self.test_session.send_packet()
1171         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1172         # wait for first echo packet
1173         while True:
1174             p = self.pg0.wait_for_packet(1)
1175             self.logger.debug(ppp("Got packet:", p))
1176             if p[UDP].dport == BFD.udp_dport_echo:
1177                 self.logger.debug(ppp("Looping back packet:", p))
1178                 p[Ether].dst = self.pg0.local_mac
1179                 self.pg0.add_stream(p)
1180                 self.pg_start()
1181                 break
1182             elif p.haslayer(BFD):
1183                 # ignore BFD
1184                 pass
1185             else:
1186                 raise Exception(ppp("Received unknown packet:", p))
1187         self.test_session.update(required_min_echo_rx=0)
1188         self.test_session.send_packet()
1189         # echo packets shouldn't arrive anymore
1190         for dummy in range(5):
1191             wait_for_bfd_packet(
1192                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1193             self.test_session.send_packet()
1194             events = self.vapi.collect_events()
1195             self.assert_equal(len(events), 0, "number of bfd events")
1196
1197     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1198     def test_echo_source_removed(self):
1199         """ echo function stops if echo source is removed """
1200         bfd_session_up(self)
1201         self.test_session.update(required_min_echo_rx=150000)
1202         self.test_session.send_packet()
1203         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1204         # wait for first echo packet
1205         while True:
1206             p = self.pg0.wait_for_packet(1)
1207             self.logger.debug(ppp("Got packet:", p))
1208             if p[UDP].dport == BFD.udp_dport_echo:
1209                 self.logger.debug(ppp("Looping back packet:", p))
1210                 p[Ether].dst = self.pg0.local_mac
1211                 self.pg0.add_stream(p)
1212                 self.pg_start()
1213                 break
1214             elif p.haslayer(BFD):
1215                 # ignore BFD
1216                 pass
1217             else:
1218                 raise Exception(ppp("Received unknown packet:", p))
1219         self.vapi.bfd_udp_del_echo_source()
1220         self.test_session.send_packet()
1221         # echo packets shouldn't arrive anymore
1222         for dummy in range(5):
1223             wait_for_bfd_packet(
1224                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1225             self.test_session.send_packet()
1226             events = self.vapi.collect_events()
1227             self.assert_equal(len(events), 0, "number of bfd events")
1228
1229     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1230     def test_stale_echo(self):
1231         """ stale echo packets don't keep a session up """
1232         bfd_session_up(self)
1233         self.test_session.update(required_min_echo_rx=150000)
1234         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1235         self.test_session.send_packet()
1236         # should be turned on - loopback echo packets
1237         echo_packet = None
1238         timeout_at = None
1239         timeout_ok = False
1240         for dummy in range(10 * self.vpp_session.detect_mult):
1241             p = self.pg0.wait_for_packet(1)
1242             if p[UDP].dport == BFD.udp_dport_echo:
1243                 if echo_packet is None:
1244                     self.logger.debug(ppp("Got first echo packet:", p))
1245                     echo_packet = p
1246                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1247                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1248                 else:
1249                     self.logger.debug(ppp("Got followup echo packet:", p))
1250                 self.logger.debug(ppp("Looping back first echo packet:", p))
1251                 echo_packet[Ether].dst = self.pg0.local_mac
1252                 self.pg0.add_stream(echo_packet)
1253                 self.pg_start()
1254             elif p.haslayer(BFD):
1255                 self.logger.debug(ppp("Got packet:", p))
1256                 if "P" in p.sprintf("%BFD.flags%"):
1257                     final = self.test_session.create_packet()
1258                     final[BFD].flags = "F"
1259                     self.test_session.send_packet(final)
1260                 if p[BFD].state == BFDState.down:
1261                     self.assertIsNotNone(
1262                         timeout_at,
1263                         "Session went down before first echo packet received")
1264                     now = time.time()
1265                     self.assertGreaterEqual(
1266                         now, timeout_at,
1267                         "Session timeout at %s, but is expected at %s" %
1268                         (now, timeout_at))
1269                     self.assert_equal(p[BFD].diag,
1270                                       BFDDiagCode.echo_function_failed,
1271                                       BFDDiagCode)
1272                     events = self.vapi.collect_events()
1273                     self.assert_equal(len(events), 1, "number of bfd events")
1274                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1275                     timeout_ok = True
1276                     break
1277             else:
1278                 raise Exception(ppp("Received unknown packet:", p))
1279             self.test_session.send_packet()
1280         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1281
1282     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1283     def test_invalid_echo_checksum(self):
1284         """ echo packets with invalid checksum don't keep a session up """
1285         bfd_session_up(self)
1286         self.test_session.update(required_min_echo_rx=150000)
1287         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1288         self.test_session.send_packet()
1289         # should be turned on - loopback echo packets
1290         timeout_at = None
1291         timeout_ok = False
1292         for dummy in range(10 * self.vpp_session.detect_mult):
1293             p = self.pg0.wait_for_packet(1)
1294             if p[UDP].dport == BFD.udp_dport_echo:
1295                 self.logger.debug(ppp("Got echo packet:", p))
1296                 if timeout_at is None:
1297                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1298                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1299                 p[BFD_vpp_echo].checksum = getrandbits(64)
1300                 p[Ether].dst = self.pg0.local_mac
1301                 self.logger.debug(ppp("Looping back modified echo packet:", p))
1302                 self.pg0.add_stream(p)
1303                 self.pg_start()
1304             elif p.haslayer(BFD):
1305                 self.logger.debug(ppp("Got packet:", p))
1306                 if "P" in p.sprintf("%BFD.flags%"):
1307                     final = self.test_session.create_packet()
1308                     final[BFD].flags = "F"
1309                     self.test_session.send_packet(final)
1310                 if p[BFD].state == BFDState.down:
1311                     self.assertIsNotNone(
1312                         timeout_at,
1313                         "Session went down before first echo packet received")
1314                     now = time.time()
1315                     self.assertGreaterEqual(
1316                         now, timeout_at,
1317                         "Session timeout at %s, but is expected at %s" %
1318                         (now, timeout_at))
1319                     self.assert_equal(p[BFD].diag,
1320                                       BFDDiagCode.echo_function_failed,
1321                                       BFDDiagCode)
1322                     events = self.vapi.collect_events()
1323                     self.assert_equal(len(events), 1, "number of bfd events")
1324                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1325                     timeout_ok = True
1326                     break
1327             else:
1328                 raise Exception(ppp("Received unknown packet:", p))
1329             self.test_session.send_packet()
1330         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1331
1332     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1333     def test_admin_up_down(self):
1334         """ put session admin-up and admin-down """
1335         bfd_session_up(self)
1336         self.vpp_session.admin_down()
1337         self.pg0.enable_capture()
1338         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1339         verify_event(self, e, expected_state=BFDState.admin_down)
1340         for dummy in range(2):
1341             p = wait_for_bfd_packet(self)
1342             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1343         # try to bring session up - shouldn't be possible
1344         self.test_session.update(state=BFDState.init)
1345         self.test_session.send_packet()
1346         for dummy in range(2):
1347             p = wait_for_bfd_packet(self)
1348             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1349         self.vpp_session.admin_up()
1350         self.test_session.update(state=BFDState.down)
1351         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1352         verify_event(self, e, expected_state=BFDState.down)
1353         p = wait_for_bfd_packet(
1354             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1355         self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1356         self.test_session.send_packet()
1357         p = wait_for_bfd_packet(
1358             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1359         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1360         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1361         verify_event(self, e, expected_state=BFDState.init)
1362         self.test_session.update(state=BFDState.up)
1363         self.test_session.send_packet()
1364         p = wait_for_bfd_packet(
1365             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1366         self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1367         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1368         verify_event(self, e, expected_state=BFDState.up)
1369
1370     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1371     def test_config_change_remote_demand(self):
1372         """ configuration change while peer in demand mode """
1373         bfd_session_up(self)
1374         demand = self.test_session.create_packet()
1375         demand[BFD].flags = "D"
1376         self.test_session.send_packet(demand)
1377         self.vpp_session.modify_parameters(
1378             required_min_rx=2 * self.vpp_session.required_min_rx)
1379         p = wait_for_bfd_packet(
1380             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1381         # poll bit must be set
1382         self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1383         # terminate poll sequence
1384         final = self.test_session.create_packet()
1385         final[BFD].flags = "D+F"
1386         self.test_session.send_packet(final)
1387         # vpp should be quiet now again
1388         transmit_time = 0.9 \
1389             * max(self.vpp_session.required_min_rx,
1390                   self.test_session.desired_min_tx) \
1391             / USEC_IN_SEC
1392         count = 0
1393         for dummy in range(self.test_session.detect_mult * 2):
1394             time.sleep(transmit_time)
1395             self.test_session.send_packet(demand)
1396             try:
1397                 p = wait_for_bfd_packet(self, timeout=0)
1398                 self.logger.error(ppp("Received unexpected packet:", p))
1399                 count += 1
1400             except CaptureTimeoutError:
1401                 pass
1402         events = self.vapi.collect_events()
1403         for e in events:
1404             self.logger.error("Received unexpected event: %s", e)
1405         self.assert_equal(count, 0, "number of packets received")
1406         self.assert_equal(len(events), 0, "number of events received")
1407
1408     def test_intf_deleted(self):
1409         """ interface with bfd session deleted """
1410         intf = VppLoInterface(self, 0)
1411         intf.config_ip4()
1412         intf.admin_up()
1413         sw_if_index = intf.sw_if_index
1414         vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip4)
1415         vpp_session.add_vpp_config()
1416         vpp_session.admin_up()
1417         intf.remove_vpp_config()
1418         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1419         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1420         self.assertFalse(vpp_session.query_vpp_config())
1421
1422
1423 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1424 class BFD6TestCase(VppTestCase):
1425     """Bidirectional Forwarding Detection (BFD) (IPv6) """
1426
1427     pg0 = None
1428     vpp_clock_offset = None
1429     vpp_session = None
1430     test_session = None
1431
1432     @classmethod
1433     def setUpClass(cls):
1434         super(BFD6TestCase, cls).setUpClass()
1435         cls.vapi.cli("set log class bfd level debug")
1436         try:
1437             cls.create_pg_interfaces([0])
1438             cls.pg0.config_ip6()
1439             cls.pg0.configure_ipv6_neighbors()
1440             cls.pg0.admin_up()
1441             cls.pg0.resolve_ndp()
1442             cls.create_loopback_interfaces(1)
1443             cls.loopback0 = cls.lo_interfaces[0]
1444             cls.loopback0.config_ip6()
1445             cls.loopback0.admin_up()
1446
1447         except Exception:
1448             super(BFD6TestCase, cls).tearDownClass()
1449             raise
1450
1451     def setUp(self):
1452         super(BFD6TestCase, self).setUp()
1453         self.factory = AuthKeyFactory()
1454         self.vapi.want_bfd_events()
1455         self.pg0.enable_capture()
1456         try:
1457             self.vpp_session = VppBFDUDPSession(self, self.pg0,
1458                                                 self.pg0.remote_ip6,
1459                                                 af=AF_INET6)
1460             self.vpp_session.add_vpp_config()
1461             self.vpp_session.admin_up()
1462             self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1463             self.logger.debug(self.vapi.cli("show adj nbr"))
1464         except:
1465             self.vapi.want_bfd_events(enable_disable=0)
1466             raise
1467
1468     def tearDown(self):
1469         if not self.vpp_dead:
1470             self.vapi.want_bfd_events(enable_disable=0)
1471         self.vapi.collect_events()  # clear the event queue
1472         super(BFD6TestCase, self).tearDown()
1473
1474     def test_session_up(self):
1475         """ bring BFD session up """
1476         bfd_session_up(self)
1477
1478     def test_session_up_by_ip(self):
1479         """ bring BFD session up - first frame looked up by address pair """
1480         self.logger.info("BFD: Sending Slow control frame")
1481         self.test_session.update(my_discriminator=randint(0, 40000000))
1482         self.test_session.send_packet()
1483         self.pg0.enable_capture()
1484         p = self.pg0.wait_for_packet(1)
1485         self.assert_equal(p[BFD].your_discriminator,
1486                           self.test_session.my_discriminator,
1487                           "BFD - your discriminator")
1488         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1489         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1490                                  state=BFDState.up)
1491         self.logger.info("BFD: Waiting for event")
1492         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1493         verify_event(self, e, expected_state=BFDState.init)
1494         self.logger.info("BFD: Sending Up")
1495         self.test_session.send_packet()
1496         self.logger.info("BFD: Waiting for event")
1497         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1498         verify_event(self, e, expected_state=BFDState.up)
1499         self.logger.info("BFD: Session is Up")
1500         self.test_session.update(state=BFDState.up)
1501         self.test_session.send_packet()
1502         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1503
1504     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1505     def test_hold_up(self):
1506         """ hold BFD session up """
1507         bfd_session_up(self)
1508         for dummy in range(self.test_session.detect_mult * 2):
1509             wait_for_bfd_packet(self)
1510             self.test_session.send_packet()
1511         self.assert_equal(len(self.vapi.collect_events()), 0,
1512                           "number of bfd events")
1513         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1514
1515     def test_echo_looped_back(self):
1516         """ echo packets looped back """
1517         # don't need a session in this case..
1518         self.vpp_session.remove_vpp_config()
1519         self.pg0.enable_capture()
1520         echo_packet_count = 10
1521         # random source port low enough to increment a few times..
1522         udp_sport_tx = randint(1, 50000)
1523         udp_sport_rx = udp_sport_tx
1524         echo_packet = (Ether(src=self.pg0.remote_mac,
1525                              dst=self.pg0.local_mac) /
1526                        IPv6(src=self.pg0.remote_ip6,
1527                             dst=self.pg0.remote_ip6) /
1528                        UDP(dport=BFD.udp_dport_echo) /
1529                        Raw("this should be looped back"))
1530         for dummy in range(echo_packet_count):
1531             self.sleep(.01, "delay between echo packets")
1532             echo_packet[UDP].sport = udp_sport_tx
1533             udp_sport_tx += 1
1534             self.logger.debug(ppp("Sending packet:", echo_packet))
1535             self.pg0.add_stream(echo_packet)
1536             self.pg_start()
1537         for dummy in range(echo_packet_count):
1538             p = self.pg0.wait_for_packet(1)
1539             self.logger.debug(ppp("Got packet:", p))
1540             ether = p[Ether]
1541             self.assert_equal(self.pg0.remote_mac,
1542                               ether.dst, "Destination MAC")
1543             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1544             ip = p[IPv6]
1545             self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1546             self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1547             udp = p[UDP]
1548             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1549                               "UDP destination port")
1550             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1551             udp_sport_rx += 1
1552             # need to compare the hex payload here, otherwise BFD_vpp_echo
1553             # gets in way
1554             self.assertEqual(str(p[UDP].payload),
1555                              str(echo_packet[UDP].payload),
1556                              "Received packet is not the echo packet sent")
1557         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1558                           "ECHO packet identifier for test purposes)")
1559         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1560                           "ECHO packet identifier for test purposes)")
1561
1562     def test_echo(self):
1563         """ echo function """
1564         bfd_session_up(self)
1565         self.test_session.update(required_min_echo_rx=150000)
1566         self.test_session.send_packet()
1567         detection_time = self.test_session.detect_mult *\
1568             self.vpp_session.required_min_rx / USEC_IN_SEC
1569         # echo shouldn't work without echo source set
1570         for dummy in range(10):
1571             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1572             self.sleep(sleep, "delay before sending bfd packet")
1573             self.test_session.send_packet()
1574         p = wait_for_bfd_packet(
1575             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1576         self.assert_equal(p[BFD].required_min_rx_interval,
1577                           self.vpp_session.required_min_rx,
1578                           "BFD required min rx interval")
1579         self.test_session.send_packet()
1580         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1581         echo_seen = False
1582         # should be turned on - loopback echo packets
1583         for dummy in range(3):
1584             loop_until = time.time() + 0.75 * detection_time
1585             while time.time() < loop_until:
1586                 p = self.pg0.wait_for_packet(1)
1587                 self.logger.debug(ppp("Got packet:", p))
1588                 if p[UDP].dport == BFD.udp_dport_echo:
1589                     self.assert_equal(
1590                         p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1591                     self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1592                                         "BFD ECHO src IP equal to loopback IP")
1593                     self.logger.debug(ppp("Looping back packet:", p))
1594                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1595                                       "ECHO packet destination MAC address")
1596                     p[Ether].dst = self.pg0.local_mac
1597                     self.pg0.add_stream(p)
1598                     self.pg_start()
1599                     echo_seen = True
1600                 elif p.haslayer(BFD):
1601                     if echo_seen:
1602                         self.assertGreaterEqual(
1603                             p[BFD].required_min_rx_interval,
1604                             1000000)
1605                     if "P" in p.sprintf("%BFD.flags%"):
1606                         final = self.test_session.create_packet()
1607                         final[BFD].flags = "F"
1608                         self.test_session.send_packet(final)
1609                 else:
1610                     raise Exception(ppp("Received unknown packet:", p))
1611
1612                 self.assert_equal(len(self.vapi.collect_events()), 0,
1613                                   "number of bfd events")
1614             self.test_session.send_packet()
1615         self.assertTrue(echo_seen, "No echo packets received")
1616
1617     def test_intf_deleted(self):
1618         """ interface with bfd session deleted """
1619         intf = VppLoInterface(self, 0)
1620         intf.config_ip6()
1621         intf.admin_up()
1622         sw_if_index = intf.sw_if_index
1623         vpp_session = VppBFDUDPSession(
1624             self, intf, intf.remote_ip6, af=AF_INET6)
1625         vpp_session.add_vpp_config()
1626         vpp_session.admin_up()
1627         intf.remove_vpp_config()
1628         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1629         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1630         self.assertFalse(vpp_session.query_vpp_config())
1631
1632
1633 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1634 class BFDFIBTestCase(VppTestCase):
1635     """ BFD-FIB interactions (IPv6) """
1636
1637     vpp_session = None
1638     test_session = None
1639
1640     def setUp(self):
1641         super(BFDFIBTestCase, self).setUp()
1642         self.create_pg_interfaces(range(1))
1643
1644         self.vapi.want_bfd_events()
1645         self.pg0.enable_capture()
1646
1647         for i in self.pg_interfaces:
1648             i.admin_up()
1649             i.config_ip6()
1650             i.configure_ipv6_neighbors()
1651
1652     def tearDown(self):
1653         if not self.vpp_dead:
1654             self.vapi.want_bfd_events(enable_disable=0)
1655
1656         super(BFDFIBTestCase, self).tearDown()
1657
1658     @staticmethod
1659     def pkt_is_not_data_traffic(p):
1660         """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1661         if p.haslayer(BFD) or is_ipv6_misc(p):
1662             return True
1663         return False
1664
1665     def test_session_with_fib(self):
1666         """ BFD-FIB interactions """
1667
1668         # packets to match against both of the routes
1669         p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1670               IPv6(src="3001::1", dst="2001::1") /
1671               UDP(sport=1234, dport=1234) /
1672               Raw('\xa5' * 100)),
1673              (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1674               IPv6(src="3001::1", dst="2002::1") /
1675               UDP(sport=1234, dport=1234) /
1676               Raw('\xa5' * 100))]
1677
1678         # A recursive and a non-recursive route via a next-hop that
1679         # will have a BFD session
1680         ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
1681                                   [VppRoutePath(self.pg0.remote_ip6,
1682                                                 self.pg0.sw_if_index,
1683                                                 proto=DpoProto.DPO_PROTO_IP6)],
1684                                   is_ip6=1)
1685         ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
1686                                   [VppRoutePath(self.pg0.remote_ip6,
1687                                                 0xffffffff,
1688                                                 proto=DpoProto.DPO_PROTO_IP6)],
1689                                   is_ip6=1)
1690         ip_2001_s_64.add_vpp_config()
1691         ip_2002_s_64.add_vpp_config()
1692
1693         # bring the session up now the routes are present
1694         self.vpp_session = VppBFDUDPSession(self,
1695                                             self.pg0,
1696                                             self.pg0.remote_ip6,
1697                                             af=AF_INET6)
1698         self.vpp_session.add_vpp_config()
1699         self.vpp_session.admin_up()
1700         self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1701
1702         # session is up - traffic passes
1703         bfd_session_up(self)
1704
1705         self.pg0.add_stream(p)
1706         self.pg_start()
1707         for packet in p:
1708             captured = self.pg0.wait_for_packet(
1709                 1,
1710                 filter_out_fn=self.pkt_is_not_data_traffic)
1711             self.assertEqual(captured[IPv6].dst,
1712                              packet[IPv6].dst)
1713
1714         # session is up - traffic is dropped
1715         bfd_session_down(self)
1716
1717         self.pg0.add_stream(p)
1718         self.pg_start()
1719         with self.assertRaises(CaptureTimeoutError):
1720             self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
1721
1722         # session is up - traffic passes
1723         bfd_session_up(self)
1724
1725         self.pg0.add_stream(p)
1726         self.pg_start()
1727         for packet in p:
1728             captured = self.pg0.wait_for_packet(
1729                 1,
1730                 filter_out_fn=self.pkt_is_not_data_traffic)
1731             self.assertEqual(captured[IPv6].dst,
1732                              packet[IPv6].dst)
1733
1734
1735 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1736 class BFDSHA1TestCase(VppTestCase):
1737     """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1738
1739     pg0 = None
1740     vpp_clock_offset = None
1741     vpp_session = None
1742     test_session = None
1743
1744     @classmethod
1745     def setUpClass(cls):
1746         super(BFDSHA1TestCase, cls).setUpClass()
1747         cls.vapi.cli("set log class bfd level debug")
1748         try:
1749             cls.create_pg_interfaces([0])
1750             cls.pg0.config_ip4()
1751             cls.pg0.admin_up()
1752             cls.pg0.resolve_arp()
1753
1754         except Exception:
1755             super(BFDSHA1TestCase, cls).tearDownClass()
1756             raise
1757
1758     def setUp(self):
1759         super(BFDSHA1TestCase, self).setUp()
1760         self.factory = AuthKeyFactory()
1761         self.vapi.want_bfd_events()
1762         self.pg0.enable_capture()
1763
1764     def tearDown(self):
1765         if not self.vpp_dead:
1766             self.vapi.want_bfd_events(enable_disable=0)
1767         self.vapi.collect_events()  # clear the event queue
1768         super(BFDSHA1TestCase, self).tearDown()
1769
1770     def test_session_up(self):
1771         """ bring BFD session up """
1772         key = self.factory.create_random_key(self)
1773         key.add_vpp_config()
1774         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1775                                             self.pg0.remote_ip4,
1776                                             sha1_key=key)
1777         self.vpp_session.add_vpp_config()
1778         self.vpp_session.admin_up()
1779         self.test_session = BFDTestSession(
1780             self, self.pg0, AF_INET, sha1_key=key,
1781             bfd_key_id=self.vpp_session.bfd_key_id)
1782         bfd_session_up(self)
1783
1784     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1785     def test_hold_up(self):
1786         """ hold BFD session up """
1787         key = self.factory.create_random_key(self)
1788         key.add_vpp_config()
1789         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1790                                             self.pg0.remote_ip4,
1791                                             sha1_key=key)
1792         self.vpp_session.add_vpp_config()
1793         self.vpp_session.admin_up()
1794         self.test_session = BFDTestSession(
1795             self, self.pg0, AF_INET, sha1_key=key,
1796             bfd_key_id=self.vpp_session.bfd_key_id)
1797         bfd_session_up(self)
1798         for dummy in range(self.test_session.detect_mult * 2):
1799             wait_for_bfd_packet(self)
1800             self.test_session.send_packet()
1801         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1802
1803     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1804     def test_hold_up_meticulous(self):
1805         """ hold BFD session up - meticulous auth """
1806         key = self.factory.create_random_key(
1807             self, BFDAuthType.meticulous_keyed_sha1)
1808         key.add_vpp_config()
1809         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1810                                             self.pg0.remote_ip4, sha1_key=key)
1811         self.vpp_session.add_vpp_config()
1812         self.vpp_session.admin_up()
1813         # specify sequence number so that it wraps
1814         self.test_session = BFDTestSession(
1815             self, self.pg0, AF_INET, sha1_key=key,
1816             bfd_key_id=self.vpp_session.bfd_key_id,
1817             our_seq_number=0xFFFFFFFF - 4)
1818         bfd_session_up(self)
1819         for dummy in range(30):
1820             wait_for_bfd_packet(self)
1821             self.test_session.inc_seq_num()
1822             self.test_session.send_packet()
1823         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1824
1825     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1826     def test_send_bad_seq_number(self):
1827         """ session is not kept alive by msgs with bad sequence numbers"""
1828         key = self.factory.create_random_key(
1829             self, BFDAuthType.meticulous_keyed_sha1)
1830         key.add_vpp_config()
1831         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1832                                             self.pg0.remote_ip4, sha1_key=key)
1833         self.vpp_session.add_vpp_config()
1834         self.test_session = BFDTestSession(
1835             self, self.pg0, AF_INET, sha1_key=key,
1836             bfd_key_id=self.vpp_session.bfd_key_id)
1837         bfd_session_up(self)
1838         detection_time = self.test_session.detect_mult *\
1839             self.vpp_session.required_min_rx / USEC_IN_SEC
1840         send_until = time.time() + 2 * detection_time
1841         while time.time() < send_until:
1842             self.test_session.send_packet()
1843             self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
1844                        "time between bfd packets")
1845         e = self.vapi.collect_events()
1846         # session should be down now, because the sequence numbers weren't
1847         # updated
1848         self.assert_equal(len(e), 1, "number of bfd events")
1849         verify_event(self, e[0], expected_state=BFDState.down)
1850
1851     def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
1852                                        legitimate_test_session,
1853                                        rogue_test_session,
1854                                        rogue_bfd_values=None):
1855         """ execute a rogue session interaction scenario
1856
1857         1. create vpp session, add config
1858         2. bring the legitimate session up
1859         3. copy the bfd values from legitimate session to rogue session
1860         4. apply rogue_bfd_values to rogue session
1861         5. set rogue session state to down
1862         6. send message to take the session down from the rogue session
1863         7. assert that the legitimate session is unaffected
1864         """
1865
1866         self.vpp_session = vpp_bfd_udp_session
1867         self.vpp_session.add_vpp_config()
1868         self.test_session = legitimate_test_session
1869         # bring vpp session up
1870         bfd_session_up(self)
1871         # send packet from rogue session
1872         rogue_test_session.update(
1873             my_discriminator=self.test_session.my_discriminator,
1874             your_discriminator=self.test_session.your_discriminator,
1875             desired_min_tx=self.test_session.desired_min_tx,
1876             required_min_rx=self.test_session.required_min_rx,
1877             detect_mult=self.test_session.detect_mult,
1878             diag=self.test_session.diag,
1879             state=self.test_session.state,
1880             auth_type=self.test_session.auth_type)
1881         if rogue_bfd_values:
1882             rogue_test_session.update(**rogue_bfd_values)
1883         rogue_test_session.update(state=BFDState.down)
1884         rogue_test_session.send_packet()
1885         wait_for_bfd_packet(self)
1886         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1887
1888     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1889     def test_mismatch_auth(self):
1890         """ session is not brought down by unauthenticated msg """
1891         key = self.factory.create_random_key(self)
1892         key.add_vpp_config()
1893         vpp_session = VppBFDUDPSession(
1894             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1895         legitimate_test_session = BFDTestSession(
1896             self, self.pg0, AF_INET, sha1_key=key,
1897             bfd_key_id=vpp_session.bfd_key_id)
1898         rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
1899         self.execute_rogue_session_scenario(vpp_session,
1900                                             legitimate_test_session,
1901                                             rogue_test_session)
1902
1903     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1904     def test_mismatch_bfd_key_id(self):
1905         """ session is not brought down by msg with non-existent key-id """
1906         key = self.factory.create_random_key(self)
1907         key.add_vpp_config()
1908         vpp_session = VppBFDUDPSession(
1909             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1910         # pick a different random bfd key id
1911         x = randint(0, 255)
1912         while x == vpp_session.bfd_key_id:
1913             x = randint(0, 255)
1914         legitimate_test_session = BFDTestSession(
1915             self, self.pg0, AF_INET, sha1_key=key,
1916             bfd_key_id=vpp_session.bfd_key_id)
1917         rogue_test_session = BFDTestSession(
1918             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
1919         self.execute_rogue_session_scenario(vpp_session,
1920                                             legitimate_test_session,
1921                                             rogue_test_session)
1922
1923     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1924     def test_mismatched_auth_type(self):
1925         """ session is not brought down by msg with wrong auth type """
1926         key = self.factory.create_random_key(self)
1927         key.add_vpp_config()
1928         vpp_session = VppBFDUDPSession(
1929             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1930         legitimate_test_session = BFDTestSession(
1931             self, self.pg0, AF_INET, sha1_key=key,
1932             bfd_key_id=vpp_session.bfd_key_id)
1933         rogue_test_session = BFDTestSession(
1934             self, self.pg0, AF_INET, sha1_key=key,
1935             bfd_key_id=vpp_session.bfd_key_id)
1936         self.execute_rogue_session_scenario(
1937             vpp_session, legitimate_test_session, rogue_test_session,
1938             {'auth_type': BFDAuthType.keyed_md5})
1939
1940     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1941     def test_restart(self):
1942         """ simulate remote peer restart and resynchronization """
1943         key = self.factory.create_random_key(
1944             self, BFDAuthType.meticulous_keyed_sha1)
1945         key.add_vpp_config()
1946         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1947                                             self.pg0.remote_ip4, sha1_key=key)
1948         self.vpp_session.add_vpp_config()
1949         self.test_session = BFDTestSession(
1950             self, self.pg0, AF_INET, sha1_key=key,
1951             bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
1952         bfd_session_up(self)
1953         # don't send any packets for 2*detection_time
1954         detection_time = self.test_session.detect_mult *\
1955             self.vpp_session.required_min_rx / USEC_IN_SEC
1956         self.sleep(2 * detection_time, "simulating peer restart")
1957         events = self.vapi.collect_events()
1958         self.assert_equal(len(events), 1, "number of bfd events")
1959         verify_event(self, events[0], expected_state=BFDState.down)
1960         self.test_session.update(state=BFDState.down)
1961         # reset sequence number
1962         self.test_session.our_seq_number = 0
1963         self.test_session.vpp_seq_number = None
1964         # now throw away any pending packets
1965         self.pg0.enable_capture()
1966         bfd_session_up(self)
1967
1968
1969 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1970 class BFDAuthOnOffTestCase(VppTestCase):
1971     """Bidirectional Forwarding Detection (BFD) (changing auth) """
1972
1973     pg0 = None
1974     vpp_session = None
1975     test_session = None
1976
1977     @classmethod
1978     def setUpClass(cls):
1979         super(BFDAuthOnOffTestCase, cls).setUpClass()
1980         cls.vapi.cli("set log class bfd level debug")
1981         try:
1982             cls.create_pg_interfaces([0])
1983             cls.pg0.config_ip4()
1984             cls.pg0.admin_up()
1985             cls.pg0.resolve_arp()
1986
1987         except Exception:
1988             super(BFDAuthOnOffTestCase, cls).tearDownClass()
1989             raise
1990
1991     def setUp(self):
1992         super(BFDAuthOnOffTestCase, self).setUp()
1993         self.factory = AuthKeyFactory()
1994         self.vapi.want_bfd_events()
1995         self.pg0.enable_capture()
1996
1997     def tearDown(self):
1998         if not self.vpp_dead:
1999             self.vapi.want_bfd_events(enable_disable=0)
2000         self.vapi.collect_events()  # clear the event queue
2001         super(BFDAuthOnOffTestCase, self).tearDown()
2002
2003     def test_auth_on_immediate(self):
2004         """ turn auth on without disturbing session state (immediate) """
2005         key = self.factory.create_random_key(self)
2006         key.add_vpp_config()
2007         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2008                                             self.pg0.remote_ip4)
2009         self.vpp_session.add_vpp_config()
2010         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2011         bfd_session_up(self)
2012         for dummy in range(self.test_session.detect_mult * 2):
2013             p = wait_for_bfd_packet(self)
2014             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2015             self.test_session.send_packet()
2016         self.vpp_session.activate_auth(key)
2017         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2018         self.test_session.sha1_key = key
2019         for dummy in range(self.test_session.detect_mult * 2):
2020             p = wait_for_bfd_packet(self)
2021             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2022             self.test_session.send_packet()
2023         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2024         self.assert_equal(len(self.vapi.collect_events()), 0,
2025                           "number of bfd events")
2026
2027     def test_auth_off_immediate(self):
2028         """ turn auth off without disturbing session state (immediate) """
2029         key = self.factory.create_random_key(self)
2030         key.add_vpp_config()
2031         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2032                                             self.pg0.remote_ip4, sha1_key=key)
2033         self.vpp_session.add_vpp_config()
2034         self.test_session = BFDTestSession(
2035             self, self.pg0, AF_INET, sha1_key=key,
2036             bfd_key_id=self.vpp_session.bfd_key_id)
2037         bfd_session_up(self)
2038         # self.vapi.want_bfd_events(enable_disable=0)
2039         for dummy in range(self.test_session.detect_mult * 2):
2040             p = wait_for_bfd_packet(self)
2041             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2042             self.test_session.inc_seq_num()
2043             self.test_session.send_packet()
2044         self.vpp_session.deactivate_auth()
2045         self.test_session.bfd_key_id = None
2046         self.test_session.sha1_key = None
2047         for dummy in range(self.test_session.detect_mult * 2):
2048             p = wait_for_bfd_packet(self)
2049             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2050             self.test_session.inc_seq_num()
2051             self.test_session.send_packet()
2052         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2053         self.assert_equal(len(self.vapi.collect_events()), 0,
2054                           "number of bfd events")
2055
2056     def test_auth_change_key_immediate(self):
2057         """ change auth key without disturbing session state (immediate) """
2058         key1 = self.factory.create_random_key(self)
2059         key1.add_vpp_config()
2060         key2 = self.factory.create_random_key(self)
2061         key2.add_vpp_config()
2062         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2063                                             self.pg0.remote_ip4, sha1_key=key1)
2064         self.vpp_session.add_vpp_config()
2065         self.test_session = BFDTestSession(
2066             self, self.pg0, AF_INET, sha1_key=key1,
2067             bfd_key_id=self.vpp_session.bfd_key_id)
2068         bfd_session_up(self)
2069         for dummy in range(self.test_session.detect_mult * 2):
2070             p = wait_for_bfd_packet(self)
2071             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2072             self.test_session.send_packet()
2073         self.vpp_session.activate_auth(key2)
2074         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2075         self.test_session.sha1_key = key2
2076         for dummy in range(self.test_session.detect_mult * 2):
2077             p = wait_for_bfd_packet(self)
2078             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2079             self.test_session.send_packet()
2080         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2081         self.assert_equal(len(self.vapi.collect_events()), 0,
2082                           "number of bfd events")
2083
2084     def test_auth_on_delayed(self):
2085         """ turn auth on without disturbing session state (delayed) """
2086         key = self.factory.create_random_key(self)
2087         key.add_vpp_config()
2088         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2089                                             self.pg0.remote_ip4)
2090         self.vpp_session.add_vpp_config()
2091         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2092         bfd_session_up(self)
2093         for dummy in range(self.test_session.detect_mult * 2):
2094             wait_for_bfd_packet(self)
2095             self.test_session.send_packet()
2096         self.vpp_session.activate_auth(key, delayed=True)
2097         for dummy in range(self.test_session.detect_mult * 2):
2098             p = wait_for_bfd_packet(self)
2099             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2100             self.test_session.send_packet()
2101         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2102         self.test_session.sha1_key = key
2103         self.test_session.send_packet()
2104         for dummy in range(self.test_session.detect_mult * 2):
2105             p = wait_for_bfd_packet(self)
2106             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2107             self.test_session.send_packet()
2108         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2109         self.assert_equal(len(self.vapi.collect_events()), 0,
2110                           "number of bfd events")
2111
2112     def test_auth_off_delayed(self):
2113         """ turn auth off without disturbing session state (delayed) """
2114         key = self.factory.create_random_key(self)
2115         key.add_vpp_config()
2116         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2117                                             self.pg0.remote_ip4, sha1_key=key)
2118         self.vpp_session.add_vpp_config()
2119         self.test_session = BFDTestSession(
2120             self, self.pg0, AF_INET, sha1_key=key,
2121             bfd_key_id=self.vpp_session.bfd_key_id)
2122         bfd_session_up(self)
2123         for dummy in range(self.test_session.detect_mult * 2):
2124             p = wait_for_bfd_packet(self)
2125             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2126             self.test_session.send_packet()
2127         self.vpp_session.deactivate_auth(delayed=True)
2128         for dummy in range(self.test_session.detect_mult * 2):
2129             p = wait_for_bfd_packet(self)
2130             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2131             self.test_session.send_packet()
2132         self.test_session.bfd_key_id = None
2133         self.test_session.sha1_key = None
2134         self.test_session.send_packet()
2135         for dummy in range(self.test_session.detect_mult * 2):
2136             p = wait_for_bfd_packet(self)
2137             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2138             self.test_session.send_packet()
2139         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2140         self.assert_equal(len(self.vapi.collect_events()), 0,
2141                           "number of bfd events")
2142
2143     def test_auth_change_key_delayed(self):
2144         """ change auth key without disturbing session state (delayed) """
2145         key1 = self.factory.create_random_key(self)
2146         key1.add_vpp_config()
2147         key2 = self.factory.create_random_key(self)
2148         key2.add_vpp_config()
2149         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2150                                             self.pg0.remote_ip4, sha1_key=key1)
2151         self.vpp_session.add_vpp_config()
2152         self.vpp_session.admin_up()
2153         self.test_session = BFDTestSession(
2154             self, self.pg0, AF_INET, sha1_key=key1,
2155             bfd_key_id=self.vpp_session.bfd_key_id)
2156         bfd_session_up(self)
2157         for dummy in range(self.test_session.detect_mult * 2):
2158             p = wait_for_bfd_packet(self)
2159             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2160             self.test_session.send_packet()
2161         self.vpp_session.activate_auth(key2, delayed=True)
2162         for dummy in range(self.test_session.detect_mult * 2):
2163             p = wait_for_bfd_packet(self)
2164             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2165             self.test_session.send_packet()
2166         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2167         self.test_session.sha1_key = key2
2168         self.test_session.send_packet()
2169         for dummy in range(self.test_session.detect_mult * 2):
2170             p = wait_for_bfd_packet(self)
2171             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2172             self.test_session.send_packet()
2173         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2174         self.assert_equal(len(self.vapi.collect_events()), 0,
2175                           "number of bfd events")
2176
2177
2178 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2179 class BFDCLITestCase(VppTestCase):
2180     """Bidirectional Forwarding Detection (BFD) (CLI) """
2181     pg0 = None
2182
2183     @classmethod
2184     def setUpClass(cls):
2185         super(BFDCLITestCase, cls).setUpClass()
2186         cls.vapi.cli("set log class bfd level debug")
2187         try:
2188             cls.create_pg_interfaces((0,))
2189             cls.pg0.config_ip4()
2190             cls.pg0.config_ip6()
2191             cls.pg0.resolve_arp()
2192             cls.pg0.resolve_ndp()
2193
2194         except Exception:
2195             super(BFDCLITestCase, cls).tearDownClass()
2196             raise
2197
2198     def setUp(self):
2199         super(BFDCLITestCase, self).setUp()
2200         self.factory = AuthKeyFactory()
2201         self.pg0.enable_capture()
2202
2203     def tearDown(self):
2204         try:
2205             self.vapi.want_bfd_events(enable_disable=0)
2206         except UnexpectedApiReturnValueError:
2207             # some tests aren't subscribed, so this is not an issue
2208             pass
2209         self.vapi.collect_events()  # clear the event queue
2210         super(BFDCLITestCase, self).tearDown()
2211
2212     def cli_verify_no_response(self, cli):
2213         """ execute a CLI, asserting that the response is empty """
2214         self.assert_equal(self.vapi.cli(cli),
2215                           "",
2216                           "CLI command response")
2217
2218     def cli_verify_response(self, cli, expected):
2219         """ execute a CLI, asserting that the response matches expectation """
2220         self.assert_equal(self.vapi.cli(cli).strip(),
2221                           expected,
2222                           "CLI command response")
2223
2224     def test_show(self):
2225         """ show commands """
2226         k1 = self.factory.create_random_key(self)
2227         k1.add_vpp_config()
2228         k2 = self.factory.create_random_key(
2229             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2230         k2.add_vpp_config()
2231         s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2232         s1.add_vpp_config()
2233         s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2234                               sha1_key=k2)
2235         s2.add_vpp_config()
2236         self.logger.info(self.vapi.ppcli("show bfd keys"))
2237         self.logger.info(self.vapi.ppcli("show bfd sessions"))
2238         self.logger.info(self.vapi.ppcli("show bfd"))
2239
2240     def test_set_del_sha1_key(self):
2241         """ set/delete SHA1 auth key """
2242         k = self.factory.create_random_key(self)
2243         self.registry.register(k, self.logger)
2244         self.cli_verify_no_response(
2245             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2246             (k.conf_key_id,
2247                 "".join("{:02x}".format(ord(c)) for c in k.key)))
2248         self.assertTrue(k.query_vpp_config())
2249         self.vpp_session = VppBFDUDPSession(
2250             self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2251         self.vpp_session.add_vpp_config()
2252         self.test_session = \
2253             BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2254                            bfd_key_id=self.vpp_session.bfd_key_id)
2255         self.vapi.want_bfd_events()
2256         bfd_session_up(self)
2257         bfd_session_down(self)
2258         # try to replace the secret for the key - should fail because the key
2259         # is in-use
2260         k2 = self.factory.create_random_key(self)
2261         self.cli_verify_response(
2262             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2263             (k.conf_key_id,
2264                 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2265             "bfd key set: `bfd_auth_set_key' API call failed, "
2266             "rv=-103:BFD object in use")
2267         # manipulating the session using old secret should still work
2268         bfd_session_up(self)
2269         bfd_session_down(self)
2270         self.vpp_session.remove_vpp_config()
2271         self.cli_verify_no_response(
2272             "bfd key del conf-key-id %s" % k.conf_key_id)
2273         self.assertFalse(k.query_vpp_config())
2274
2275     def test_set_del_meticulous_sha1_key(self):
2276         """ set/delete meticulous SHA1 auth key """
2277         k = self.factory.create_random_key(
2278             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2279         self.registry.register(k, self.logger)
2280         self.cli_verify_no_response(
2281             "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2282             (k.conf_key_id,
2283                 "".join("{:02x}".format(ord(c)) for c in k.key)))
2284         self.assertTrue(k.query_vpp_config())
2285         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2286                                             self.pg0.remote_ip6, af=AF_INET6,
2287                                             sha1_key=k)
2288         self.vpp_session.add_vpp_config()
2289         self.vpp_session.admin_up()
2290         self.test_session = \
2291             BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2292                            bfd_key_id=self.vpp_session.bfd_key_id)
2293         self.vapi.want_bfd_events()
2294         bfd_session_up(self)
2295         bfd_session_down(self)
2296         # try to replace the secret for the key - should fail because the key
2297         # is in-use
2298         k2 = self.factory.create_random_key(self)
2299         self.cli_verify_response(
2300             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2301             (k.conf_key_id,
2302                 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2303             "bfd key set: `bfd_auth_set_key' API call failed, "
2304             "rv=-103:BFD object in use")
2305         # manipulating the session using old secret should still work
2306         bfd_session_up(self)
2307         bfd_session_down(self)
2308         self.vpp_session.remove_vpp_config()
2309         self.cli_verify_no_response(
2310             "bfd key del conf-key-id %s" % k.conf_key_id)
2311         self.assertFalse(k.query_vpp_config())
2312
2313     def test_add_mod_del_bfd_udp(self):
2314         """ create/modify/delete IPv4 BFD UDP session """
2315         vpp_session = VppBFDUDPSession(
2316             self, self.pg0, self.pg0.remote_ip4)
2317         self.registry.register(vpp_session, self.logger)
2318         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2319             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2320             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2321                                 self.pg0.remote_ip4,
2322                                 vpp_session.desired_min_tx,
2323                                 vpp_session.required_min_rx,
2324                                 vpp_session.detect_mult)
2325         self.cli_verify_no_response(cli_add_cmd)
2326         # 2nd add should fail
2327         self.cli_verify_response(
2328             cli_add_cmd,
2329             "bfd udp session add: `bfd_add_add_session' API call"
2330             " failed, rv=-101:Duplicate BFD object")
2331         verify_bfd_session_config(self, vpp_session)
2332         mod_session = VppBFDUDPSession(
2333             self, self.pg0, self.pg0.remote_ip4,
2334             required_min_rx=2 * vpp_session.required_min_rx,
2335             desired_min_tx=3 * vpp_session.desired_min_tx,
2336             detect_mult=4 * vpp_session.detect_mult)
2337         self.cli_verify_no_response(
2338             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2339             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2340             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2341              mod_session.desired_min_tx, mod_session.required_min_rx,
2342              mod_session.detect_mult))
2343         verify_bfd_session_config(self, mod_session)
2344         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2345             "peer-addr %s" % (self.pg0.name,
2346                               self.pg0.local_ip4, self.pg0.remote_ip4)
2347         self.cli_verify_no_response(cli_del_cmd)
2348         # 2nd del is expected to fail
2349         self.cli_verify_response(
2350             cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2351             " failed, rv=-102:No such BFD object")
2352         self.assertFalse(vpp_session.query_vpp_config())
2353
2354     def test_add_mod_del_bfd_udp6(self):
2355         """ create/modify/delete IPv6 BFD UDP session """
2356         vpp_session = VppBFDUDPSession(
2357             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2358         self.registry.register(vpp_session, self.logger)
2359         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2360             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2361             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2362                                 self.pg0.remote_ip6,
2363                                 vpp_session.desired_min_tx,
2364                                 vpp_session.required_min_rx,
2365                                 vpp_session.detect_mult)
2366         self.cli_verify_no_response(cli_add_cmd)
2367         # 2nd add should fail
2368         self.cli_verify_response(
2369             cli_add_cmd,
2370             "bfd udp session add: `bfd_add_add_session' API call"
2371             " failed, rv=-101:Duplicate BFD object")
2372         verify_bfd_session_config(self, vpp_session)
2373         mod_session = VppBFDUDPSession(
2374             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2375             required_min_rx=2 * vpp_session.required_min_rx,
2376             desired_min_tx=3 * vpp_session.desired_min_tx,
2377             detect_mult=4 * vpp_session.detect_mult)
2378         self.cli_verify_no_response(
2379             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2380             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2381             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2382              mod_session.desired_min_tx,
2383              mod_session.required_min_rx, mod_session.detect_mult))
2384         verify_bfd_session_config(self, mod_session)
2385         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2386             "peer-addr %s" % (self.pg0.name,
2387                               self.pg0.local_ip6, self.pg0.remote_ip6)
2388         self.cli_verify_no_response(cli_del_cmd)
2389         # 2nd del is expected to fail
2390         self.cli_verify_response(
2391             cli_del_cmd,
2392             "bfd udp session del: `bfd_udp_del_session' API call"
2393             " failed, rv=-102:No such BFD object")
2394         self.assertFalse(vpp_session.query_vpp_config())
2395
2396     def test_add_mod_del_bfd_udp_auth(self):
2397         """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2398         key = self.factory.create_random_key(self)
2399         key.add_vpp_config()
2400         vpp_session = VppBFDUDPSession(
2401             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2402         self.registry.register(vpp_session, self.logger)
2403         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2404             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2405             "detect-mult %s conf-key-id %s bfd-key-id %s"\
2406             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2407                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2408                vpp_session.detect_mult, key.conf_key_id,
2409                vpp_session.bfd_key_id)
2410         self.cli_verify_no_response(cli_add_cmd)
2411         # 2nd add should fail
2412         self.cli_verify_response(
2413             cli_add_cmd,
2414             "bfd udp session add: `bfd_add_add_session' API call"
2415             " failed, rv=-101:Duplicate BFD object")
2416         verify_bfd_session_config(self, vpp_session)
2417         mod_session = VppBFDUDPSession(
2418             self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2419             bfd_key_id=vpp_session.bfd_key_id,
2420             required_min_rx=2 * vpp_session.required_min_rx,
2421             desired_min_tx=3 * vpp_session.desired_min_tx,
2422             detect_mult=4 * vpp_session.detect_mult)
2423         self.cli_verify_no_response(
2424             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2425             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2426             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2427              mod_session.desired_min_tx,
2428              mod_session.required_min_rx, mod_session.detect_mult))
2429         verify_bfd_session_config(self, mod_session)
2430         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2431             "peer-addr %s" % (self.pg0.name,
2432                               self.pg0.local_ip4, self.pg0.remote_ip4)
2433         self.cli_verify_no_response(cli_del_cmd)
2434         # 2nd del is expected to fail
2435         self.cli_verify_response(
2436             cli_del_cmd,
2437             "bfd udp session del: `bfd_udp_del_session' API call"
2438             " failed, rv=-102:No such BFD object")
2439         self.assertFalse(vpp_session.query_vpp_config())
2440
2441     def test_add_mod_del_bfd_udp6_auth(self):
2442         """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2443         key = self.factory.create_random_key(
2444             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2445         key.add_vpp_config()
2446         vpp_session = VppBFDUDPSession(
2447             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2448         self.registry.register(vpp_session, self.logger)
2449         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2450             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2451             "detect-mult %s conf-key-id %s bfd-key-id %s" \
2452             % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2453                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2454                vpp_session.detect_mult, key.conf_key_id,
2455                vpp_session.bfd_key_id)
2456         self.cli_verify_no_response(cli_add_cmd)
2457         # 2nd add should fail
2458         self.cli_verify_response(
2459             cli_add_cmd,
2460             "bfd udp session add: `bfd_add_add_session' API call"
2461             " failed, rv=-101:Duplicate BFD object")
2462         verify_bfd_session_config(self, vpp_session)
2463         mod_session = VppBFDUDPSession(
2464             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2465             bfd_key_id=vpp_session.bfd_key_id,
2466             required_min_rx=2 * vpp_session.required_min_rx,
2467             desired_min_tx=3 * vpp_session.desired_min_tx,
2468             detect_mult=4 * vpp_session.detect_mult)
2469         self.cli_verify_no_response(
2470             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2471             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2472             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2473              mod_session.desired_min_tx,
2474              mod_session.required_min_rx, mod_session.detect_mult))
2475         verify_bfd_session_config(self, mod_session)
2476         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2477             "peer-addr %s" % (self.pg0.name,
2478                               self.pg0.local_ip6, self.pg0.remote_ip6)
2479         self.cli_verify_no_response(cli_del_cmd)
2480         # 2nd del is expected to fail
2481         self.cli_verify_response(
2482             cli_del_cmd,
2483             "bfd udp session del: `bfd_udp_del_session' API call"
2484             " failed, rv=-102:No such BFD object")
2485         self.assertFalse(vpp_session.query_vpp_config())
2486
2487     def test_auth_on_off(self):
2488         """ turn authentication on and off """
2489         key = self.factory.create_random_key(
2490             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2491         key.add_vpp_config()
2492         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2493         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2494                                         sha1_key=key)
2495         session.add_vpp_config()
2496         cli_activate = \
2497             "bfd udp session auth activate interface %s local-addr %s "\
2498             "peer-addr %s conf-key-id %s bfd-key-id %s"\
2499             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2500                key.conf_key_id, auth_session.bfd_key_id)
2501         self.cli_verify_no_response(cli_activate)
2502         verify_bfd_session_config(self, auth_session)
2503         self.cli_verify_no_response(cli_activate)
2504         verify_bfd_session_config(self, auth_session)
2505         cli_deactivate = \
2506             "bfd udp session auth deactivate interface %s local-addr %s "\
2507             "peer-addr %s "\
2508             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2509         self.cli_verify_no_response(cli_deactivate)
2510         verify_bfd_session_config(self, session)
2511         self.cli_verify_no_response(cli_deactivate)
2512         verify_bfd_session_config(self, session)
2513
2514     def test_auth_on_off_delayed(self):
2515         """ turn authentication on and off (delayed) """
2516         key = self.factory.create_random_key(
2517             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2518         key.add_vpp_config()
2519         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2520         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2521                                         sha1_key=key)
2522         session.add_vpp_config()
2523         cli_activate = \
2524             "bfd udp session auth activate interface %s local-addr %s "\
2525             "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2526             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2527                key.conf_key_id, auth_session.bfd_key_id)
2528         self.cli_verify_no_response(cli_activate)
2529         verify_bfd_session_config(self, auth_session)
2530         self.cli_verify_no_response(cli_activate)
2531         verify_bfd_session_config(self, auth_session)
2532         cli_deactivate = \
2533             "bfd udp session auth deactivate interface %s local-addr %s "\
2534             "peer-addr %s delayed yes"\
2535             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2536         self.cli_verify_no_response(cli_deactivate)
2537         verify_bfd_session_config(self, session)
2538         self.cli_verify_no_response(cli_deactivate)
2539         verify_bfd_session_config(self, session)
2540
2541     def test_admin_up_down(self):
2542         """ put session admin-up and admin-down """
2543         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2544         session.add_vpp_config()
2545         cli_down = \
2546             "bfd udp session set-flags admin down interface %s local-addr %s "\
2547             "peer-addr %s "\
2548             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2549         cli_up = \
2550             "bfd udp session set-flags admin up interface %s local-addr %s "\
2551             "peer-addr %s "\
2552             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2553         self.cli_verify_no_response(cli_down)
2554         verify_bfd_session_config(self, session, state=BFDState.admin_down)
2555         self.cli_verify_no_response(cli_up)
2556         verify_bfd_session_config(self, session, state=BFDState.down)
2557
2558     def test_set_del_udp_echo_source(self):
2559         """ set/del udp echo source """
2560         self.create_loopback_interfaces(1)
2561         self.loopback0 = self.lo_interfaces[0]
2562         self.loopback0.admin_up()
2563         self.cli_verify_response("show bfd echo-source",
2564                                  "UDP echo source is not set.")
2565         cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2566         self.cli_verify_no_response(cli_set)
2567         self.cli_verify_response("show bfd echo-source",
2568                                  "UDP echo source is: %s\n"
2569                                  "IPv4 address usable as echo source: none\n"
2570                                  "IPv6 address usable as echo source: none" %
2571                                  self.loopback0.name)
2572         self.loopback0.config_ip4()
2573         unpacked = unpack("!L", self.loopback0.local_ip4n)
2574         echo_ip4 = inet_ntop(AF_INET, pack("!L", unpacked[0] ^ 1))
2575         self.cli_verify_response("show bfd echo-source",
2576                                  "UDP echo source is: %s\n"
2577                                  "IPv4 address usable as echo source: %s\n"
2578                                  "IPv6 address usable as echo source: none" %
2579                                  (self.loopback0.name, echo_ip4))
2580         unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
2581         echo_ip6 = inet_ntop(AF_INET6, pack("!LLLL", unpacked[0], unpacked[1],
2582                                             unpacked[2], unpacked[3] ^ 1))
2583         self.loopback0.config_ip6()
2584         self.cli_verify_response("show bfd echo-source",
2585                                  "UDP echo source is: %s\n"
2586                                  "IPv4 address usable as echo source: %s\n"
2587                                  "IPv6 address usable as echo source: %s" %
2588                                  (self.loopback0.name, echo_ip4, echo_ip6))
2589         cli_del = "bfd udp echo-source del"
2590         self.cli_verify_no_response(cli_del)
2591         self.cli_verify_response("show bfd echo-source",
2592                                  "UDP echo source is not set.")
2593
2594 if __name__ == '__main__':
2595     unittest.main(testRunner=VppTestRunner)