IP-MAC,ND:wildcard events,fix sending multiple events
[vpp.git] / test / test_bfd.py
1 #!/usr/bin/env python
2 """ BFD tests """
3
4 from __future__ import division
5 import unittest
6 import hashlib
7 import binascii
8 import time
9 from struct import pack, unpack
10 from random import randint, shuffle, getrandbits
11 from socket import AF_INET, AF_INET6, inet_ntop
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether
14 from scapy.layers.inet import UDP, IP
15 from scapy.layers.inet6 import IPv6
16 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
17     BFDDiagCode, BFDState, BFD_vpp_echo
18 from framework import VppTestCase, VppTestRunner, running_extended_tests
19 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
20 from vpp_lo_interface import VppLoInterface
21 from util import ppp
22 from vpp_papi_provider import UnexpectedApiReturnValueError
23 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
24
25 USEC_IN_SEC = 1000000
26
27
28 class AuthKeyFactory(object):
29     """Factory class for creating auth keys with unique conf key ID"""
30
31     def __init__(self):
32         self._conf_key_ids = {}
33
34     def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
35         """ create a random key with unique conf key id """
36         conf_key_id = randint(0, 0xFFFFFFFF)
37         while conf_key_id in self._conf_key_ids:
38             conf_key_id = randint(0, 0xFFFFFFFF)
39         self._conf_key_ids[conf_key_id] = 1
40         key = str(bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
41         return VppBFDAuthKey(test=test, auth_type=auth_type,
42                              conf_key_id=conf_key_id, key=key)
43
44
45 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
46 class BFDAPITestCase(VppTestCase):
47     """Bidirectional Forwarding Detection (BFD) - API"""
48
49     pg0 = None
50     pg1 = None
51
52     @classmethod
53     def setUpClass(cls):
54         super(BFDAPITestCase, cls).setUpClass()
55
56         try:
57             cls.create_pg_interfaces(range(2))
58             for i in cls.pg_interfaces:
59                 i.config_ip4()
60                 i.config_ip6()
61                 i.resolve_arp()
62
63         except Exception:
64             super(BFDAPITestCase, cls).tearDownClass()
65             raise
66
67     def setUp(self):
68         super(BFDAPITestCase, self).setUp()
69         self.factory = AuthKeyFactory()
70
71     def test_add_bfd(self):
72         """ create a BFD session """
73         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
74         session.add_vpp_config()
75         self.logger.debug("Session state is %s", session.state)
76         session.remove_vpp_config()
77         session.add_vpp_config()
78         self.logger.debug("Session state is %s", session.state)
79         session.remove_vpp_config()
80
81     def test_double_add(self):
82         """ create the same BFD session twice (negative case) """
83         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
84         session.add_vpp_config()
85
86         with self.vapi.expect_negative_api_retval():
87             session.add_vpp_config()
88
89         session.remove_vpp_config()
90
91     def test_add_bfd6(self):
92         """ create IPv6 BFD session """
93         session = VppBFDUDPSession(
94             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
95         session.add_vpp_config()
96         self.logger.debug("Session state is %s", session.state)
97         session.remove_vpp_config()
98         session.add_vpp_config()
99         self.logger.debug("Session state is %s", session.state)
100         session.remove_vpp_config()
101
102     def test_mod_bfd(self):
103         """ modify BFD session parameters """
104         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
105                                    desired_min_tx=50000,
106                                    required_min_rx=10000,
107                                    detect_mult=1)
108         session.add_vpp_config()
109         s = session.get_bfd_udp_session_dump_entry()
110         self.assert_equal(session.desired_min_tx,
111                           s.desired_min_tx,
112                           "desired min transmit interval")
113         self.assert_equal(session.required_min_rx,
114                           s.required_min_rx,
115                           "required min receive interval")
116         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
117         session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
118                                   required_min_rx=session.required_min_rx * 2,
119                                   detect_mult=session.detect_mult * 2)
120         s = session.get_bfd_udp_session_dump_entry()
121         self.assert_equal(session.desired_min_tx,
122                           s.desired_min_tx,
123                           "desired min transmit interval")
124         self.assert_equal(session.required_min_rx,
125                           s.required_min_rx,
126                           "required min receive interval")
127         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
128
129     def test_add_sha1_keys(self):
130         """ add SHA1 keys """
131         key_count = 10
132         keys = [self.factory.create_random_key(
133             self) for i in range(0, key_count)]
134         for key in keys:
135             self.assertFalse(key.query_vpp_config())
136         for key in keys:
137             key.add_vpp_config()
138         for key in keys:
139             self.assertTrue(key.query_vpp_config())
140         # remove randomly
141         indexes = range(key_count)
142         shuffle(indexes)
143         removed = []
144         for i in indexes:
145             key = keys[i]
146             key.remove_vpp_config()
147             removed.append(i)
148             for j in range(key_count):
149                 key = keys[j]
150                 if j in removed:
151                     self.assertFalse(key.query_vpp_config())
152                 else:
153                     self.assertTrue(key.query_vpp_config())
154         # should be removed now
155         for key in keys:
156             self.assertFalse(key.query_vpp_config())
157         # add back and remove again
158         for key in keys:
159             key.add_vpp_config()
160         for key in keys:
161             self.assertTrue(key.query_vpp_config())
162         for key in keys:
163             key.remove_vpp_config()
164         for key in keys:
165             self.assertFalse(key.query_vpp_config())
166
167     def test_add_bfd_sha1(self):
168         """ create a BFD session (SHA1) """
169         key = self.factory.create_random_key(self)
170         key.add_vpp_config()
171         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
172                                    sha1_key=key)
173         session.add_vpp_config()
174         self.logger.debug("Session state is %s", session.state)
175         session.remove_vpp_config()
176         session.add_vpp_config()
177         self.logger.debug("Session state is %s", session.state)
178         session.remove_vpp_config()
179
180     def test_double_add_sha1(self):
181         """ create the same BFD session twice (negative case) (SHA1) """
182         key = self.factory.create_random_key(self)
183         key.add_vpp_config()
184         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
185                                    sha1_key=key)
186         session.add_vpp_config()
187         with self.assertRaises(Exception):
188             session.add_vpp_config()
189
190     def test_add_auth_nonexistent_key(self):
191         """ create BFD session using non-existent SHA1 (negative case) """
192         session = VppBFDUDPSession(
193             self, self.pg0, self.pg0.remote_ip4,
194             sha1_key=self.factory.create_random_key(self))
195         with self.assertRaises(Exception):
196             session.add_vpp_config()
197
198     def test_shared_sha1_key(self):
199         """ share single SHA1 key between multiple BFD sessions """
200         key = self.factory.create_random_key(self)
201         key.add_vpp_config()
202         sessions = [
203             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
204                              sha1_key=key),
205             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
206                              sha1_key=key, af=AF_INET6),
207             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
208                              sha1_key=key),
209             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
210                              sha1_key=key, af=AF_INET6)]
211         for s in sessions:
212             s.add_vpp_config()
213         removed = 0
214         for s in sessions:
215             e = key.get_bfd_auth_keys_dump_entry()
216             self.assert_equal(e.use_count, len(sessions) - removed,
217                               "Use count for shared key")
218             s.remove_vpp_config()
219             removed += 1
220         e = key.get_bfd_auth_keys_dump_entry()
221         self.assert_equal(e.use_count, len(sessions) - removed,
222                           "Use count for shared key")
223
224     def test_activate_auth(self):
225         """ activate SHA1 authentication """
226         key = self.factory.create_random_key(self)
227         key.add_vpp_config()
228         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
229         session.add_vpp_config()
230         session.activate_auth(key)
231
232     def test_deactivate_auth(self):
233         """ deactivate SHA1 authentication """
234         key = self.factory.create_random_key(self)
235         key.add_vpp_config()
236         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
237         session.add_vpp_config()
238         session.activate_auth(key)
239         session.deactivate_auth()
240
241     def test_change_key(self):
242         """ change SHA1 key """
243         key1 = self.factory.create_random_key(self)
244         key2 = self.factory.create_random_key(self)
245         while key2.conf_key_id == key1.conf_key_id:
246             key2 = self.factory.create_random_key(self)
247         key1.add_vpp_config()
248         key2.add_vpp_config()
249         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
250                                    sha1_key=key1)
251         session.add_vpp_config()
252         session.activate_auth(key2)
253
254
255 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
256 class BFDTestSession(object):
257     """ BFD session as seen from test framework side """
258
259     def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
260                  bfd_key_id=None, our_seq_number=None):
261         self.test = test
262         self.af = af
263         self.sha1_key = sha1_key
264         self.bfd_key_id = bfd_key_id
265         self.interface = interface
266         self.udp_sport = randint(49152, 65535)
267         if our_seq_number is None:
268             self.our_seq_number = randint(0, 40000000)
269         else:
270             self.our_seq_number = our_seq_number
271         self.vpp_seq_number = None
272         self.my_discriminator = 0
273         self.desired_min_tx = 300000
274         self.required_min_rx = 300000
275         self.required_min_echo_rx = None
276         self.detect_mult = detect_mult
277         self.diag = BFDDiagCode.no_diagnostic
278         self.your_discriminator = None
279         self.state = BFDState.down
280         self.auth_type = BFDAuthType.no_auth
281
282     def inc_seq_num(self):
283         """ increment sequence number, wrapping if needed """
284         if self.our_seq_number == 0xFFFFFFFF:
285             self.our_seq_number = 0
286         else:
287             self.our_seq_number += 1
288
289     def update(self, my_discriminator=None, your_discriminator=None,
290                desired_min_tx=None, required_min_rx=None,
291                required_min_echo_rx=None, detect_mult=None,
292                diag=None, state=None, auth_type=None):
293         """ update BFD parameters associated with session """
294         if my_discriminator is not None:
295             self.my_discriminator = my_discriminator
296         if your_discriminator is not None:
297             self.your_discriminator = your_discriminator
298         if required_min_rx is not None:
299             self.required_min_rx = required_min_rx
300         if required_min_echo_rx is not None:
301             self.required_min_echo_rx = required_min_echo_rx
302         if desired_min_tx is not None:
303             self.desired_min_tx = desired_min_tx
304         if detect_mult is not None:
305             self.detect_mult = detect_mult
306         if diag is not None:
307             self.diag = diag
308         if state is not None:
309             self.state = state
310         if auth_type is not None:
311             self.auth_type = auth_type
312
313     def fill_packet_fields(self, packet):
314         """ set packet fields with known values in packet """
315         bfd = packet[BFD]
316         if self.my_discriminator:
317             self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
318                                    self.my_discriminator)
319             bfd.my_discriminator = self.my_discriminator
320         if self.your_discriminator:
321             self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
322                                    self.your_discriminator)
323             bfd.your_discriminator = self.your_discriminator
324         if self.required_min_rx:
325             self.test.logger.debug(
326                 "BFD: setting packet.required_min_rx_interval=%s",
327                 self.required_min_rx)
328             bfd.required_min_rx_interval = self.required_min_rx
329         if self.required_min_echo_rx:
330             self.test.logger.debug(
331                 "BFD: setting packet.required_min_echo_rx=%s",
332                 self.required_min_echo_rx)
333             bfd.required_min_echo_rx_interval = self.required_min_echo_rx
334         if self.desired_min_tx:
335             self.test.logger.debug(
336                 "BFD: setting packet.desired_min_tx_interval=%s",
337                 self.desired_min_tx)
338             bfd.desired_min_tx_interval = self.desired_min_tx
339         if self.detect_mult:
340             self.test.logger.debug(
341                 "BFD: setting packet.detect_mult=%s", self.detect_mult)
342             bfd.detect_mult = self.detect_mult
343         if self.diag:
344             self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
345             bfd.diag = self.diag
346         if self.state:
347             self.test.logger.debug("BFD: setting packet.state=%s", self.state)
348             bfd.state = self.state
349         if self.auth_type:
350             # this is used by a negative test-case
351             self.test.logger.debug("BFD: setting packet.auth_type=%s",
352                                    self.auth_type)
353             bfd.auth_type = self.auth_type
354
355     def create_packet(self):
356         """ create a BFD packet, reflecting the current state of session """
357         if self.sha1_key:
358             bfd = BFD(flags="A")
359             bfd.auth_type = self.sha1_key.auth_type
360             bfd.auth_len = BFD.sha1_auth_len
361             bfd.auth_key_id = self.bfd_key_id
362             bfd.auth_seq_num = self.our_seq_number
363             bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
364         else:
365             bfd = BFD()
366         if self.af == AF_INET6:
367             packet = (Ether(src=self.interface.remote_mac,
368                             dst=self.interface.local_mac) /
369                       IPv6(src=self.interface.remote_ip6,
370                            dst=self.interface.local_ip6,
371                            hlim=255) /
372                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
373                       bfd)
374         else:
375             packet = (Ether(src=self.interface.remote_mac,
376                             dst=self.interface.local_mac) /
377                       IP(src=self.interface.remote_ip4,
378                          dst=self.interface.local_ip4,
379                          ttl=255) /
380                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
381                       bfd)
382         self.test.logger.debug("BFD: Creating packet")
383         self.fill_packet_fields(packet)
384         if self.sha1_key:
385             hash_material = str(packet[BFD])[:32] + self.sha1_key.key + \
386                 "\0" * (20 - len(self.sha1_key.key))
387             self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
388                                    hashlib.sha1(hash_material).hexdigest())
389             packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
390         return packet
391
392     def send_packet(self, packet=None, interface=None):
393         """ send packet on interface, creating the packet if needed """
394         if packet is None:
395             packet = self.create_packet()
396         if interface is None:
397             interface = self.test.pg0
398         self.test.logger.debug(ppp("Sending packet:", packet))
399         interface.add_stream(packet)
400         self.test.pg_start()
401
402     def verify_sha1_auth(self, packet):
403         """ Verify correctness of authentication in BFD layer. """
404         bfd = packet[BFD]
405         self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
406         self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
407                                BFDAuthType)
408         self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
409         self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
410         if self.vpp_seq_number is None:
411             self.vpp_seq_number = bfd.auth_seq_num
412             self.test.logger.debug("Received initial sequence number: %s" %
413                                    self.vpp_seq_number)
414         else:
415             recvd_seq_num = bfd.auth_seq_num
416             self.test.logger.debug("Received followup sequence number: %s" %
417                                    recvd_seq_num)
418             if self.vpp_seq_number < 0xffffffff:
419                 if self.sha1_key.auth_type == \
420                         BFDAuthType.meticulous_keyed_sha1:
421                     self.test.assert_equal(recvd_seq_num,
422                                            self.vpp_seq_number + 1,
423                                            "BFD sequence number")
424                 else:
425                     self.test.assert_in_range(recvd_seq_num,
426                                               self.vpp_seq_number,
427                                               self.vpp_seq_number + 1,
428                                               "BFD sequence number")
429             else:
430                 if self.sha1_key.auth_type == \
431                         BFDAuthType.meticulous_keyed_sha1:
432                     self.test.assert_equal(recvd_seq_num, 0,
433                                            "BFD sequence number")
434                 else:
435                     self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
436                                        "BFD sequence number not one of "
437                                        "(%s, 0)" % self.vpp_seq_number)
438             self.vpp_seq_number = recvd_seq_num
439         # last 20 bytes represent the hash - so replace them with the key,
440         # pad the result with zeros and hash the result
441         hash_material = bfd.original[:-20] + self.sha1_key.key + \
442             "\0" * (20 - len(self.sha1_key.key))
443         expected_hash = hashlib.sha1(hash_material).hexdigest()
444         self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
445                                expected_hash, "Auth key hash")
446
447     def verify_bfd(self, packet):
448         """ Verify correctness of BFD layer. """
449         bfd = packet[BFD]
450         self.test.assert_equal(bfd.version, 1, "BFD version")
451         self.test.assert_equal(bfd.your_discriminator,
452                                self.my_discriminator,
453                                "BFD - your discriminator")
454         if self.sha1_key:
455             self.verify_sha1_auth(packet)
456
457
458 def bfd_session_up(test):
459     """ Bring BFD session up """
460     test.logger.info("BFD: Waiting for slow hello")
461     p = wait_for_bfd_packet(test, 2)
462     old_offset = None
463     if hasattr(test, 'vpp_clock_offset'):
464         old_offset = test.vpp_clock_offset
465     test.vpp_clock_offset = time.time() - p.time
466     test.logger.debug("BFD: Calculated vpp clock offset: %s",
467                       test.vpp_clock_offset)
468     if old_offset:
469         test.assertAlmostEqual(
470             old_offset, test.vpp_clock_offset, delta=0.5,
471             msg="vpp clock offset not stable (new: %s, old: %s)" %
472             (test.vpp_clock_offset, old_offset))
473     test.logger.info("BFD: Sending Init")
474     test.test_session.update(my_discriminator=randint(0, 40000000),
475                              your_discriminator=p[BFD].my_discriminator,
476                              state=BFDState.init)
477     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
478             BFDAuthType.meticulous_keyed_sha1:
479         test.test_session.inc_seq_num()
480     test.test_session.send_packet()
481     test.logger.info("BFD: Waiting for event")
482     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
483     verify_event(test, e, expected_state=BFDState.up)
484     test.logger.info("BFD: Session is Up")
485     test.test_session.update(state=BFDState.up)
486     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
487             BFDAuthType.meticulous_keyed_sha1:
488         test.test_session.inc_seq_num()
489     test.test_session.send_packet()
490     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
491
492
493 def bfd_session_down(test):
494     """ Bring BFD session down """
495     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
496     test.test_session.update(state=BFDState.down)
497     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
498             BFDAuthType.meticulous_keyed_sha1:
499         test.test_session.inc_seq_num()
500     test.test_session.send_packet()
501     test.logger.info("BFD: Waiting for event")
502     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
503     verify_event(test, e, expected_state=BFDState.down)
504     test.logger.info("BFD: Session is Down")
505     test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
506
507
508 def verify_bfd_session_config(test, session, state=None):
509     dump = session.get_bfd_udp_session_dump_entry()
510     test.assertIsNotNone(dump)
511     # since dump is not none, we have verified that sw_if_index and addresses
512     # are valid (in get_bfd_udp_session_dump_entry)
513     if state:
514         test.assert_equal(dump.state, state, "session state")
515     test.assert_equal(dump.required_min_rx, session.required_min_rx,
516                       "required min rx interval")
517     test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
518                       "desired min tx interval")
519     test.assert_equal(dump.detect_mult, session.detect_mult,
520                       "detect multiplier")
521     if session.sha1_key is None:
522         test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
523     else:
524         test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
525         test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
526                           "bfd key id")
527         test.assert_equal(dump.conf_key_id,
528                           session.sha1_key.conf_key_id,
529                           "config key id")
530
531
532 def verify_ip(test, packet):
533     """ Verify correctness of IP layer. """
534     if test.vpp_session.af == AF_INET6:
535         ip = packet[IPv6]
536         local_ip = test.pg0.local_ip6
537         remote_ip = test.pg0.remote_ip6
538         test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
539     else:
540         ip = packet[IP]
541         local_ip = test.pg0.local_ip4
542         remote_ip = test.pg0.remote_ip4
543         test.assert_equal(ip.ttl, 255, "IPv4 TTL")
544     test.assert_equal(ip.src, local_ip, "IP source address")
545     test.assert_equal(ip.dst, remote_ip, "IP destination address")
546
547
548 def verify_udp(test, packet):
549     """ Verify correctness of UDP layer. """
550     udp = packet[UDP]
551     test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
552     test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
553                          "UDP source port")
554
555
556 def verify_event(test, event, expected_state):
557     """ Verify correctness of event values. """
558     e = event
559     test.logger.debug("BFD: Event: %s" % repr(e))
560     test.assert_equal(e.sw_if_index,
561                       test.vpp_session.interface.sw_if_index,
562                       "BFD interface index")
563     is_ipv6 = 0
564     if test.vpp_session.af == AF_INET6:
565         is_ipv6 = 1
566     test.assert_equal(e.is_ipv6, is_ipv6, "is_ipv6")
567     if test.vpp_session.af == AF_INET:
568         test.assert_equal(e.local_addr[:4], test.vpp_session.local_addr_n,
569                           "Local IPv4 address")
570         test.assert_equal(e.peer_addr[:4], test.vpp_session.peer_addr_n,
571                           "Peer IPv4 address")
572     else:
573         test.assert_equal(e.local_addr, test.vpp_session.local_addr_n,
574                           "Local IPv6 address")
575         test.assert_equal(e.peer_addr, test.vpp_session.peer_addr_n,
576                           "Peer IPv6 address")
577     test.assert_equal(e.state, expected_state, BFDState)
578
579
580 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None):
581     """ wait for BFD packet and verify its correctness
582
583     :param timeout: how long to wait
584     :param pcap_time_min: ignore packets with pcap timestamp lower than this
585
586     :returns: tuple (packet, time spent waiting for packet)
587     """
588     test.logger.info("BFD: Waiting for BFD packet")
589     deadline = time.time() + timeout
590     counter = 0
591     while True:
592         counter += 1
593         # sanity check
594         test.assert_in_range(counter, 0, 100, "number of packets ignored")
595         time_left = deadline - time.time()
596         if time_left < 0:
597             raise CaptureTimeoutError("Packet did not arrive within timeout")
598         p = test.pg0.wait_for_packet(timeout=time_left)
599         test.logger.debug(ppp("BFD: Got packet:", p))
600         if pcap_time_min is not None and p.time < pcap_time_min:
601             test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
602                                   "pcap time min %s):" %
603                                   (p.time, pcap_time_min), p))
604         else:
605             break
606     bfd = p[BFD]
607     if bfd is None:
608         raise Exception(ppp("Unexpected or invalid BFD packet:", p))
609     if bfd.payload:
610         raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
611     verify_ip(test, p)
612     verify_udp(test, p)
613     test.test_session.verify_bfd(p)
614     return p
615
616
617 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
618 class BFD4TestCase(VppTestCase):
619     """Bidirectional Forwarding Detection (BFD)"""
620
621     pg0 = None
622     vpp_clock_offset = None
623     vpp_session = None
624     test_session = None
625
626     @classmethod
627     def setUpClass(cls):
628         super(BFD4TestCase, cls).setUpClass()
629         try:
630             cls.create_pg_interfaces([0])
631             cls.create_loopback_interfaces([0])
632             cls.loopback0 = cls.lo_interfaces[0]
633             cls.loopback0.config_ip4()
634             cls.loopback0.admin_up()
635             cls.pg0.config_ip4()
636             cls.pg0.configure_ipv4_neighbors()
637             cls.pg0.admin_up()
638             cls.pg0.resolve_arp()
639
640         except Exception:
641             super(BFD4TestCase, cls).tearDownClass()
642             raise
643
644     def setUp(self):
645         super(BFD4TestCase, self).setUp()
646         self.factory = AuthKeyFactory()
647         self.vapi.want_bfd_events()
648         self.pg0.enable_capture()
649         try:
650             self.vpp_session = VppBFDUDPSession(self, self.pg0,
651                                                 self.pg0.remote_ip4)
652             self.vpp_session.add_vpp_config()
653             self.vpp_session.admin_up()
654             self.test_session = BFDTestSession(self, self.pg0, AF_INET)
655         except:
656             self.vapi.want_bfd_events(enable_disable=0)
657             raise
658
659     def tearDown(self):
660         if not self.vpp_dead:
661             self.vapi.want_bfd_events(enable_disable=0)
662         self.vapi.collect_events()  # clear the event queue
663         super(BFD4TestCase, self).tearDown()
664
665     def test_session_up(self):
666         """ bring BFD session up """
667         bfd_session_up(self)
668
669     def test_session_up_by_ip(self):
670         """ bring BFD session up - first frame looked up by address pair """
671         self.logger.info("BFD: Sending Slow control frame")
672         self.test_session.update(my_discriminator=randint(0, 40000000))
673         self.test_session.send_packet()
674         self.pg0.enable_capture()
675         p = self.pg0.wait_for_packet(1)
676         self.assert_equal(p[BFD].your_discriminator,
677                           self.test_session.my_discriminator,
678                           "BFD - your discriminator")
679         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
680         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
681                                  state=BFDState.up)
682         self.logger.info("BFD: Waiting for event")
683         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
684         verify_event(self, e, expected_state=BFDState.init)
685         self.logger.info("BFD: Sending Up")
686         self.test_session.send_packet()
687         self.logger.info("BFD: Waiting for event")
688         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
689         verify_event(self, e, expected_state=BFDState.up)
690         self.logger.info("BFD: Session is Up")
691         self.test_session.update(state=BFDState.up)
692         self.test_session.send_packet()
693         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
694
695     def test_session_down(self):
696         """ bring BFD session down """
697         bfd_session_up(self)
698         bfd_session_down(self)
699
700     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
701     def test_hold_up(self):
702         """ hold BFD session up """
703         bfd_session_up(self)
704         for dummy in range(self.test_session.detect_mult * 2):
705             wait_for_bfd_packet(self)
706             self.test_session.send_packet()
707         self.assert_equal(len(self.vapi.collect_events()), 0,
708                           "number of bfd events")
709
710     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
711     def test_slow_timer(self):
712         """ verify slow periodic control frames while session down """
713         packet_count = 3
714         self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
715         prev_packet = wait_for_bfd_packet(self, 2)
716         for dummy in range(packet_count):
717             next_packet = wait_for_bfd_packet(self, 2)
718             time_diff = next_packet.time - prev_packet.time
719             # spec says the range should be <0.75, 1>, allow extra 0.05 margin
720             # to work around timing issues
721             self.assert_in_range(
722                 time_diff, 0.70, 1.05, "time between slow packets")
723             prev_packet = next_packet
724
725     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
726     def test_zero_remote_min_rx(self):
727         """ no packets when zero remote required min rx interval """
728         bfd_session_up(self)
729         self.test_session.update(required_min_rx=0)
730         self.test_session.send_packet()
731         for dummy in range(self.test_session.detect_mult):
732             self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
733                        "sleep before transmitting bfd packet")
734             self.test_session.send_packet()
735             try:
736                 p = wait_for_bfd_packet(self, timeout=0)
737                 self.logger.error(ppp("Received unexpected packet:", p))
738             except CaptureTimeoutError:
739                 pass
740         self.assert_equal(
741             len(self.vapi.collect_events()), 0, "number of bfd events")
742         self.test_session.update(required_min_rx=300000)
743         for dummy in range(3):
744             self.test_session.send_packet()
745             wait_for_bfd_packet(
746                 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
747         self.assert_equal(
748             len(self.vapi.collect_events()), 0, "number of bfd events")
749
750     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
751     def test_conn_down(self):
752         """ verify session goes down after inactivity """
753         bfd_session_up(self)
754         detection_time = self.test_session.detect_mult *\
755             self.vpp_session.required_min_rx / USEC_IN_SEC
756         self.sleep(detection_time, "waiting for BFD session time-out")
757         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
758         verify_event(self, e, expected_state=BFDState.down)
759
760     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
761     def test_large_required_min_rx(self):
762         """ large remote required min rx interval """
763         bfd_session_up(self)
764         p = wait_for_bfd_packet(self)
765         interval = 3000000
766         self.test_session.update(required_min_rx=interval)
767         self.test_session.send_packet()
768         time_mark = time.time()
769         count = 0
770         # busy wait here, trying to collect a packet or event, vpp is not
771         # allowed to send packets and the session will timeout first - so the
772         # Up->Down event must arrive before any packets do
773         while time.time() < time_mark + interval / USEC_IN_SEC:
774             try:
775                 p = wait_for_bfd_packet(self, timeout=0)
776                 # if vpp managed to send a packet before we did the session
777                 # session update, then that's fine, ignore it
778                 if p.time < time_mark - self.vpp_clock_offset:
779                     continue
780                 self.logger.error(ppp("Received unexpected packet:", p))
781                 count += 1
782             except CaptureTimeoutError:
783                 pass
784             events = self.vapi.collect_events()
785             if len(events) > 0:
786                 verify_event(self, events[0], BFDState.down)
787                 break
788         self.assert_equal(count, 0, "number of packets received")
789
790     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
791     def test_immediate_remote_min_rx_reduction(self):
792         """ immediately honor remote required min rx reduction """
793         self.vpp_session.remove_vpp_config()
794         self.vpp_session = VppBFDUDPSession(
795             self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
796         self.pg0.enable_capture()
797         self.vpp_session.add_vpp_config()
798         self.test_session.update(desired_min_tx=1000000,
799                                  required_min_rx=1000000)
800         bfd_session_up(self)
801         reference_packet = wait_for_bfd_packet(self)
802         time_mark = time.time()
803         interval = 300000
804         self.test_session.update(required_min_rx=interval)
805         self.test_session.send_packet()
806         extra_time = time.time() - time_mark
807         p = wait_for_bfd_packet(self)
808         # first packet is allowed to be late by time we spent doing the update
809         # calculated in extra_time
810         self.assert_in_range(p.time - reference_packet.time,
811                              .95 * 0.75 * interval / USEC_IN_SEC,
812                              1.05 * interval / USEC_IN_SEC + extra_time,
813                              "time between BFD packets")
814         reference_packet = p
815         for dummy in range(3):
816             p = wait_for_bfd_packet(self)
817             diff = p.time - reference_packet.time
818             self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
819                                  1.05 * interval / USEC_IN_SEC,
820                                  "time between BFD packets")
821             reference_packet = p
822
823     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
824     def test_modify_req_min_rx_double(self):
825         """ modify session - double required min rx """
826         bfd_session_up(self)
827         p = wait_for_bfd_packet(self)
828         self.test_session.update(desired_min_tx=10000,
829                                  required_min_rx=10000)
830         self.test_session.send_packet()
831         # double required min rx
832         self.vpp_session.modify_parameters(
833             required_min_rx=2 * self.vpp_session.required_min_rx)
834         p = wait_for_bfd_packet(
835             self, pcap_time_min=time.time() - self.vpp_clock_offset)
836         # poll bit needs to be set
837         self.assertIn("P", p.sprintf("%BFD.flags%"),
838                       "Poll bit not set in BFD packet")
839         # finish poll sequence with final packet
840         final = self.test_session.create_packet()
841         final[BFD].flags = "F"
842         timeout = self.test_session.detect_mult * \
843             max(self.test_session.desired_min_tx,
844                 self.vpp_session.required_min_rx) / USEC_IN_SEC
845         self.test_session.send_packet(final)
846         time_mark = time.time()
847         e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
848         verify_event(self, e, expected_state=BFDState.down)
849         time_to_event = time.time() - time_mark
850         self.assert_in_range(time_to_event, .9 * timeout,
851                              1.1 * timeout, "session timeout")
852
853     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
854     def test_modify_req_min_rx_halve(self):
855         """ modify session - halve required min rx """
856         self.vpp_session.modify_parameters(
857             required_min_rx=2 * self.vpp_session.required_min_rx)
858         bfd_session_up(self)
859         p = wait_for_bfd_packet(self)
860         self.test_session.update(desired_min_tx=10000,
861                                  required_min_rx=10000)
862         self.test_session.send_packet()
863         p = wait_for_bfd_packet(
864             self, pcap_time_min=time.time() - self.vpp_clock_offset)
865         # halve required min rx
866         old_required_min_rx = self.vpp_session.required_min_rx
867         self.vpp_session.modify_parameters(
868             required_min_rx=0.5 * self.vpp_session.required_min_rx)
869         # now we wait 0.8*3*old-req-min-rx and the session should still be up
870         self.sleep(0.8 * self.vpp_session.detect_mult *
871                    old_required_min_rx / USEC_IN_SEC,
872                    "wait before finishing poll sequence")
873         self.assert_equal(len(self.vapi.collect_events()), 0,
874                           "number of bfd events")
875         p = wait_for_bfd_packet(self)
876         # poll bit needs to be set
877         self.assertIn("P", p.sprintf("%BFD.flags%"),
878                       "Poll bit not set in BFD packet")
879         # finish poll sequence with final packet
880         final = self.test_session.create_packet()
881         final[BFD].flags = "F"
882         self.test_session.send_packet(final)
883         # now the session should time out under new conditions
884         detection_time = self.test_session.detect_mult *\
885             self.vpp_session.required_min_rx / USEC_IN_SEC
886         before = time.time()
887         e = self.vapi.wait_for_event(
888             2 * detection_time, "bfd_udp_session_details")
889         after = time.time()
890         self.assert_in_range(after - before,
891                              0.9 * detection_time,
892                              1.1 * detection_time,
893                              "time before bfd session goes down")
894         verify_event(self, e, expected_state=BFDState.down)
895
896     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
897     def test_modify_detect_mult(self):
898         """ modify detect multiplier """
899         bfd_session_up(self)
900         p = wait_for_bfd_packet(self)
901         self.vpp_session.modify_parameters(detect_mult=1)
902         p = wait_for_bfd_packet(
903             self, pcap_time_min=time.time() - self.vpp_clock_offset)
904         self.assert_equal(self.vpp_session.detect_mult,
905                           p[BFD].detect_mult,
906                           "detect mult")
907         # poll bit must not be set
908         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
909                          "Poll bit not set in BFD packet")
910         self.vpp_session.modify_parameters(detect_mult=10)
911         p = wait_for_bfd_packet(
912             self, pcap_time_min=time.time() - self.vpp_clock_offset)
913         self.assert_equal(self.vpp_session.detect_mult,
914                           p[BFD].detect_mult,
915                           "detect mult")
916         # poll bit must not be set
917         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
918                          "Poll bit not set in BFD packet")
919
920     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
921     def test_queued_poll(self):
922         """ test poll sequence queueing """
923         bfd_session_up(self)
924         p = wait_for_bfd_packet(self)
925         self.vpp_session.modify_parameters(
926             required_min_rx=2 * self.vpp_session.required_min_rx)
927         p = wait_for_bfd_packet(self)
928         poll_sequence_start = time.time()
929         poll_sequence_length_min = 0.5
930         send_final_after = time.time() + poll_sequence_length_min
931         # poll bit needs to be set
932         self.assertIn("P", p.sprintf("%BFD.flags%"),
933                       "Poll bit not set in BFD packet")
934         self.assert_equal(p[BFD].required_min_rx_interval,
935                           self.vpp_session.required_min_rx,
936                           "BFD required min rx interval")
937         self.vpp_session.modify_parameters(
938             required_min_rx=2 * self.vpp_session.required_min_rx)
939         # 2nd poll sequence should be queued now
940         # don't send the reply back yet, wait for some time to emulate
941         # longer round-trip time
942         packet_count = 0
943         while time.time() < send_final_after:
944             self.test_session.send_packet()
945             p = wait_for_bfd_packet(self)
946             self.assert_equal(len(self.vapi.collect_events()), 0,
947                               "number of bfd events")
948             self.assert_equal(p[BFD].required_min_rx_interval,
949                               self.vpp_session.required_min_rx,
950                               "BFD required min rx interval")
951             packet_count += 1
952             # poll bit must be set
953             self.assertIn("P", p.sprintf("%BFD.flags%"),
954                           "Poll bit not set in BFD packet")
955         final = self.test_session.create_packet()
956         final[BFD].flags = "F"
957         self.test_session.send_packet(final)
958         # finish 1st with final
959         poll_sequence_length = time.time() - poll_sequence_start
960         # vpp must wait for some time before starting new poll sequence
961         poll_no_2_started = False
962         for dummy in range(2 * packet_count):
963             p = wait_for_bfd_packet(self)
964             self.assert_equal(len(self.vapi.collect_events()), 0,
965                               "number of bfd events")
966             if "P" in p.sprintf("%BFD.flags%"):
967                 poll_no_2_started = True
968                 if time.time() < poll_sequence_start + poll_sequence_length:
969                     raise Exception("VPP started 2nd poll sequence too soon")
970                 final = self.test_session.create_packet()
971                 final[BFD].flags = "F"
972                 self.test_session.send_packet(final)
973                 break
974             else:
975                 self.test_session.send_packet()
976         self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
977         # finish 2nd with final
978         final = self.test_session.create_packet()
979         final[BFD].flags = "F"
980         self.test_session.send_packet(final)
981         p = wait_for_bfd_packet(self)
982         # poll bit must not be set
983         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
984                          "Poll bit set in BFD packet")
985
986     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
987     def test_poll_response(self):
988         """ test correct response to control frame with poll bit set """
989         bfd_session_up(self)
990         poll = self.test_session.create_packet()
991         poll[BFD].flags = "P"
992         self.test_session.send_packet(poll)
993         final = wait_for_bfd_packet(
994             self, pcap_time_min=time.time() - self.vpp_clock_offset)
995         self.assertIn("F", final.sprintf("%BFD.flags%"))
996
997     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
998     def test_no_periodic_if_remote_demand(self):
999         """ no periodic frames outside poll sequence if remote demand set """
1000         bfd_session_up(self)
1001         demand = self.test_session.create_packet()
1002         demand[BFD].flags = "D"
1003         self.test_session.send_packet(demand)
1004         transmit_time = 0.9 \
1005             * max(self.vpp_session.required_min_rx,
1006                   self.test_session.desired_min_tx) \
1007             / USEC_IN_SEC
1008         count = 0
1009         for dummy in range(self.test_session.detect_mult * 2):
1010             time.sleep(transmit_time)
1011             self.test_session.send_packet(demand)
1012             try:
1013                 p = wait_for_bfd_packet(self, timeout=0)
1014                 self.logger.error(ppp("Received unexpected packet:", p))
1015                 count += 1
1016             except CaptureTimeoutError:
1017                 pass
1018         events = self.vapi.collect_events()
1019         for e in events:
1020             self.logger.error("Received unexpected event: %s", e)
1021         self.assert_equal(count, 0, "number of packets received")
1022         self.assert_equal(len(events), 0, "number of events received")
1023
1024     def test_echo_looped_back(self):
1025         """ echo packets looped back """
1026         # don't need a session in this case..
1027         self.vpp_session.remove_vpp_config()
1028         self.pg0.enable_capture()
1029         echo_packet_count = 10
1030         # random source port low enough to increment a few times..
1031         udp_sport_tx = randint(1, 50000)
1032         udp_sport_rx = udp_sport_tx
1033         echo_packet = (Ether(src=self.pg0.remote_mac,
1034                              dst=self.pg0.local_mac) /
1035                        IP(src=self.pg0.remote_ip4,
1036                           dst=self.pg0.remote_ip4) /
1037                        UDP(dport=BFD.udp_dport_echo) /
1038                        Raw("this should be looped back"))
1039         for dummy in range(echo_packet_count):
1040             self.sleep(.01, "delay between echo packets")
1041             echo_packet[UDP].sport = udp_sport_tx
1042             udp_sport_tx += 1
1043             self.logger.debug(ppp("Sending packet:", echo_packet))
1044             self.pg0.add_stream(echo_packet)
1045             self.pg_start()
1046         for dummy in range(echo_packet_count):
1047             p = self.pg0.wait_for_packet(1)
1048             self.logger.debug(ppp("Got packet:", p))
1049             ether = p[Ether]
1050             self.assert_equal(self.pg0.remote_mac,
1051                               ether.dst, "Destination MAC")
1052             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1053             ip = p[IP]
1054             self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1055             self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1056             udp = p[UDP]
1057             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1058                               "UDP destination port")
1059             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1060             udp_sport_rx += 1
1061             # need to compare the hex payload here, otherwise BFD_vpp_echo
1062             # gets in way
1063             self.assertEqual(str(p[UDP].payload),
1064                              str(echo_packet[UDP].payload),
1065                              "Received packet is not the echo packet sent")
1066         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1067                           "ECHO packet identifier for test purposes)")
1068
1069     def test_echo(self):
1070         """ echo function """
1071         bfd_session_up(self)
1072         self.test_session.update(required_min_echo_rx=150000)
1073         self.test_session.send_packet()
1074         detection_time = self.test_session.detect_mult *\
1075             self.vpp_session.required_min_rx / USEC_IN_SEC
1076         # echo shouldn't work without echo source set
1077         for dummy in range(10):
1078             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1079             self.sleep(sleep, "delay before sending bfd packet")
1080             self.test_session.send_packet()
1081         p = wait_for_bfd_packet(
1082             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1083         self.assert_equal(p[BFD].required_min_rx_interval,
1084                           self.vpp_session.required_min_rx,
1085                           "BFD required min rx interval")
1086         self.test_session.send_packet()
1087         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1088         echo_seen = False
1089         # should be turned on - loopback echo packets
1090         for dummy in range(3):
1091             loop_until = time.time() + 0.75 * detection_time
1092             while time.time() < loop_until:
1093                 p = self.pg0.wait_for_packet(1)
1094                 self.logger.debug(ppp("Got packet:", p))
1095                 if p[UDP].dport == BFD.udp_dport_echo:
1096                     self.assert_equal(
1097                         p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1098                     self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1099                                         "BFD ECHO src IP equal to loopback IP")
1100                     self.logger.debug(ppp("Looping back packet:", p))
1101                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1102                                       "ECHO packet destination MAC address")
1103                     p[Ether].dst = self.pg0.local_mac
1104                     self.pg0.add_stream(p)
1105                     self.pg_start()
1106                     echo_seen = True
1107                 elif p.haslayer(BFD):
1108                     if echo_seen:
1109                         self.assertGreaterEqual(
1110                             p[BFD].required_min_rx_interval,
1111                             1000000)
1112                     if "P" in p.sprintf("%BFD.flags%"):
1113                         final = self.test_session.create_packet()
1114                         final[BFD].flags = "F"
1115                         self.test_session.send_packet(final)
1116                 else:
1117                     raise Exception(ppp("Received unknown packet:", p))
1118
1119                 self.assert_equal(len(self.vapi.collect_events()), 0,
1120                                   "number of bfd events")
1121             self.test_session.send_packet()
1122         self.assertTrue(echo_seen, "No echo packets received")
1123
1124     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1125     def test_echo_fail(self):
1126         """ session goes down if echo function fails """
1127         bfd_session_up(self)
1128         self.test_session.update(required_min_echo_rx=150000)
1129         self.test_session.send_packet()
1130         detection_time = self.test_session.detect_mult *\
1131             self.vpp_session.required_min_rx / USEC_IN_SEC
1132         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1133         # echo function should be used now, but we will drop the echo packets
1134         verified_diag = False
1135         for dummy in range(3):
1136             loop_until = time.time() + 0.75 * detection_time
1137             while time.time() < loop_until:
1138                 p = self.pg0.wait_for_packet(1)
1139                 self.logger.debug(ppp("Got packet:", p))
1140                 if p[UDP].dport == BFD.udp_dport_echo:
1141                     # dropped
1142                     pass
1143                 elif p.haslayer(BFD):
1144                     if "P" in p.sprintf("%BFD.flags%"):
1145                         self.assertGreaterEqual(
1146                             p[BFD].required_min_rx_interval,
1147                             1000000)
1148                         final = self.test_session.create_packet()
1149                         final[BFD].flags = "F"
1150                         self.test_session.send_packet(final)
1151                     if p[BFD].state == BFDState.down:
1152                         self.assert_equal(p[BFD].diag,
1153                                           BFDDiagCode.echo_function_failed,
1154                                           BFDDiagCode)
1155                         verified_diag = True
1156                 else:
1157                     raise Exception(ppp("Received unknown packet:", p))
1158             self.test_session.send_packet()
1159         events = self.vapi.collect_events()
1160         self.assert_equal(len(events), 1, "number of bfd events")
1161         self.assert_equal(events[0].state, BFDState.down, BFDState)
1162         self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1163
1164     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1165     def test_echo_stop(self):
1166         """ echo function stops if peer sets required min echo rx zero """
1167         bfd_session_up(self)
1168         self.test_session.update(required_min_echo_rx=150000)
1169         self.test_session.send_packet()
1170         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1171         # wait for first echo packet
1172         while True:
1173             p = self.pg0.wait_for_packet(1)
1174             self.logger.debug(ppp("Got packet:", p))
1175             if p[UDP].dport == BFD.udp_dport_echo:
1176                 self.logger.debug(ppp("Looping back packet:", p))
1177                 p[Ether].dst = self.pg0.local_mac
1178                 self.pg0.add_stream(p)
1179                 self.pg_start()
1180                 break
1181             elif p.haslayer(BFD):
1182                 # ignore BFD
1183                 pass
1184             else:
1185                 raise Exception(ppp("Received unknown packet:", p))
1186         self.test_session.update(required_min_echo_rx=0)
1187         self.test_session.send_packet()
1188         # echo packets shouldn't arrive anymore
1189         for dummy in range(5):
1190             wait_for_bfd_packet(
1191                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1192             self.test_session.send_packet()
1193             events = self.vapi.collect_events()
1194             self.assert_equal(len(events), 0, "number of bfd events")
1195
1196     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1197     def test_echo_source_removed(self):
1198         """ echo function stops if echo source is removed """
1199         bfd_session_up(self)
1200         self.test_session.update(required_min_echo_rx=150000)
1201         self.test_session.send_packet()
1202         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1203         # wait for first echo packet
1204         while True:
1205             p = self.pg0.wait_for_packet(1)
1206             self.logger.debug(ppp("Got packet:", p))
1207             if p[UDP].dport == BFD.udp_dport_echo:
1208                 self.logger.debug(ppp("Looping back packet:", p))
1209                 p[Ether].dst = self.pg0.local_mac
1210                 self.pg0.add_stream(p)
1211                 self.pg_start()
1212                 break
1213             elif p.haslayer(BFD):
1214                 # ignore BFD
1215                 pass
1216             else:
1217                 raise Exception(ppp("Received unknown packet:", p))
1218         self.vapi.bfd_udp_del_echo_source()
1219         self.test_session.send_packet()
1220         # echo packets shouldn't arrive anymore
1221         for dummy in range(5):
1222             wait_for_bfd_packet(
1223                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1224             self.test_session.send_packet()
1225             events = self.vapi.collect_events()
1226             self.assert_equal(len(events), 0, "number of bfd events")
1227
1228     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1229     def test_stale_echo(self):
1230         """ stale echo packets don't keep a session up """
1231         bfd_session_up(self)
1232         self.test_session.update(required_min_echo_rx=150000)
1233         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1234         self.test_session.send_packet()
1235         # should be turned on - loopback echo packets
1236         echo_packet = None
1237         timeout_at = None
1238         timeout_ok = False
1239         for dummy in range(10 * self.vpp_session.detect_mult):
1240             p = self.pg0.wait_for_packet(1)
1241             if p[UDP].dport == BFD.udp_dport_echo:
1242                 if echo_packet is None:
1243                     self.logger.debug(ppp("Got first echo packet:", p))
1244                     echo_packet = p
1245                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1246                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1247                 else:
1248                     self.logger.debug(ppp("Got followup echo packet:", p))
1249                 self.logger.debug(ppp("Looping back first echo packet:", p))
1250                 echo_packet[Ether].dst = self.pg0.local_mac
1251                 self.pg0.add_stream(echo_packet)
1252                 self.pg_start()
1253             elif p.haslayer(BFD):
1254                 self.logger.debug(ppp("Got packet:", p))
1255                 if "P" in p.sprintf("%BFD.flags%"):
1256                     final = self.test_session.create_packet()
1257                     final[BFD].flags = "F"
1258                     self.test_session.send_packet(final)
1259                 if p[BFD].state == BFDState.down:
1260                     self.assertIsNotNone(
1261                         timeout_at,
1262                         "Session went down before first echo packet received")
1263                     now = time.time()
1264                     self.assertGreaterEqual(
1265                         now, timeout_at,
1266                         "Session timeout at %s, but is expected at %s" %
1267                         (now, timeout_at))
1268                     self.assert_equal(p[BFD].diag,
1269                                       BFDDiagCode.echo_function_failed,
1270                                       BFDDiagCode)
1271                     events = self.vapi.collect_events()
1272                     self.assert_equal(len(events), 1, "number of bfd events")
1273                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1274                     timeout_ok = True
1275                     break
1276             else:
1277                 raise Exception(ppp("Received unknown packet:", p))
1278             self.test_session.send_packet()
1279         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1280
1281     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1282     def test_invalid_echo_checksum(self):
1283         """ echo packets with invalid checksum don't keep a session up """
1284         bfd_session_up(self)
1285         self.test_session.update(required_min_echo_rx=150000)
1286         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1287         self.test_session.send_packet()
1288         # should be turned on - loopback echo packets
1289         timeout_at = None
1290         timeout_ok = False
1291         for dummy in range(10 * self.vpp_session.detect_mult):
1292             p = self.pg0.wait_for_packet(1)
1293             if p[UDP].dport == BFD.udp_dport_echo:
1294                 self.logger.debug(ppp("Got echo packet:", p))
1295                 if timeout_at is None:
1296                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1297                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1298                 p[BFD_vpp_echo].checksum = getrandbits(64)
1299                 p[Ether].dst = self.pg0.local_mac
1300                 self.logger.debug(ppp("Looping back modified echo packet:", p))
1301                 self.pg0.add_stream(p)
1302                 self.pg_start()
1303             elif p.haslayer(BFD):
1304                 self.logger.debug(ppp("Got packet:", p))
1305                 if "P" in p.sprintf("%BFD.flags%"):
1306                     final = self.test_session.create_packet()
1307                     final[BFD].flags = "F"
1308                     self.test_session.send_packet(final)
1309                 if p[BFD].state == BFDState.down:
1310                     self.assertIsNotNone(
1311                         timeout_at,
1312                         "Session went down before first echo packet received")
1313                     now = time.time()
1314                     self.assertGreaterEqual(
1315                         now, timeout_at,
1316                         "Session timeout at %s, but is expected at %s" %
1317                         (now, timeout_at))
1318                     self.assert_equal(p[BFD].diag,
1319                                       BFDDiagCode.echo_function_failed,
1320                                       BFDDiagCode)
1321                     events = self.vapi.collect_events()
1322                     self.assert_equal(len(events), 1, "number of bfd events")
1323                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1324                     timeout_ok = True
1325                     break
1326             else:
1327                 raise Exception(ppp("Received unknown packet:", p))
1328             self.test_session.send_packet()
1329         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1330
1331     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1332     def test_admin_up_down(self):
1333         """ put session admin-up and admin-down """
1334         bfd_session_up(self)
1335         self.vpp_session.admin_down()
1336         self.pg0.enable_capture()
1337         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1338         verify_event(self, e, expected_state=BFDState.admin_down)
1339         for dummy in range(2):
1340             p = wait_for_bfd_packet(self)
1341             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1342         # try to bring session up - shouldn't be possible
1343         self.test_session.update(state=BFDState.init)
1344         self.test_session.send_packet()
1345         for dummy in range(2):
1346             p = wait_for_bfd_packet(self)
1347             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1348         self.vpp_session.admin_up()
1349         self.test_session.update(state=BFDState.down)
1350         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1351         verify_event(self, e, expected_state=BFDState.down)
1352         p = wait_for_bfd_packet(
1353             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1354         self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1355         self.test_session.send_packet()
1356         p = wait_for_bfd_packet(
1357             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1358         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1359         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1360         verify_event(self, e, expected_state=BFDState.init)
1361         self.test_session.update(state=BFDState.up)
1362         self.test_session.send_packet()
1363         p = wait_for_bfd_packet(
1364             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1365         self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1366         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1367         verify_event(self, e, expected_state=BFDState.up)
1368
1369     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1370     def test_config_change_remote_demand(self):
1371         """ configuration change while peer in demand mode """
1372         bfd_session_up(self)
1373         demand = self.test_session.create_packet()
1374         demand[BFD].flags = "D"
1375         self.test_session.send_packet(demand)
1376         self.vpp_session.modify_parameters(
1377             required_min_rx=2 * self.vpp_session.required_min_rx)
1378         p = wait_for_bfd_packet(
1379             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1380         # poll bit must be set
1381         self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1382         # terminate poll sequence
1383         final = self.test_session.create_packet()
1384         final[BFD].flags = "D+F"
1385         self.test_session.send_packet(final)
1386         # vpp should be quiet now again
1387         transmit_time = 0.9 \
1388             * max(self.vpp_session.required_min_rx,
1389                   self.test_session.desired_min_tx) \
1390             / USEC_IN_SEC
1391         count = 0
1392         for dummy in range(self.test_session.detect_mult * 2):
1393             time.sleep(transmit_time)
1394             self.test_session.send_packet(demand)
1395             try:
1396                 p = wait_for_bfd_packet(self, timeout=0)
1397                 self.logger.error(ppp("Received unexpected packet:", p))
1398                 count += 1
1399             except CaptureTimeoutError:
1400                 pass
1401         events = self.vapi.collect_events()
1402         for e in events:
1403             self.logger.error("Received unexpected event: %s", e)
1404         self.assert_equal(count, 0, "number of packets received")
1405         self.assert_equal(len(events), 0, "number of events received")
1406
1407     def test_intf_deleted(self):
1408         """ interface with bfd session deleted """
1409         intf = VppLoInterface(self, 0)
1410         intf.config_ip4()
1411         intf.admin_up()
1412         sw_if_index = intf.sw_if_index
1413         vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip4)
1414         vpp_session.add_vpp_config()
1415         vpp_session.admin_up()
1416         intf.remove_vpp_config()
1417         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1418         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1419         self.assertFalse(vpp_session.query_vpp_config())
1420
1421
1422 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1423 class BFD6TestCase(VppTestCase):
1424     """Bidirectional Forwarding Detection (BFD) (IPv6) """
1425
1426     pg0 = None
1427     vpp_clock_offset = None
1428     vpp_session = None
1429     test_session = None
1430
1431     @classmethod
1432     def setUpClass(cls):
1433         super(BFD6TestCase, cls).setUpClass()
1434         try:
1435             cls.create_pg_interfaces([0])
1436             cls.pg0.config_ip6()
1437             cls.pg0.configure_ipv6_neighbors()
1438             cls.pg0.admin_up()
1439             cls.pg0.resolve_ndp()
1440             cls.create_loopback_interfaces([0])
1441             cls.loopback0 = cls.lo_interfaces[0]
1442             cls.loopback0.config_ip6()
1443             cls.loopback0.admin_up()
1444
1445         except Exception:
1446             super(BFD6TestCase, cls).tearDownClass()
1447             raise
1448
1449     def setUp(self):
1450         super(BFD6TestCase, self).setUp()
1451         self.factory = AuthKeyFactory()
1452         self.vapi.want_bfd_events()
1453         self.pg0.enable_capture()
1454         try:
1455             self.vpp_session = VppBFDUDPSession(self, self.pg0,
1456                                                 self.pg0.remote_ip6,
1457                                                 af=AF_INET6)
1458             self.vpp_session.add_vpp_config()
1459             self.vpp_session.admin_up()
1460             self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1461             self.logger.debug(self.vapi.cli("show adj nbr"))
1462         except:
1463             self.vapi.want_bfd_events(enable_disable=0)
1464             raise
1465
1466     def tearDown(self):
1467         if not self.vpp_dead:
1468             self.vapi.want_bfd_events(enable_disable=0)
1469         self.vapi.collect_events()  # clear the event queue
1470         super(BFD6TestCase, self).tearDown()
1471
1472     def test_session_up(self):
1473         """ bring BFD session up """
1474         bfd_session_up(self)
1475
1476     def test_session_up_by_ip(self):
1477         """ bring BFD session up - first frame looked up by address pair """
1478         self.logger.info("BFD: Sending Slow control frame")
1479         self.test_session.update(my_discriminator=randint(0, 40000000))
1480         self.test_session.send_packet()
1481         self.pg0.enable_capture()
1482         p = self.pg0.wait_for_packet(1)
1483         self.assert_equal(p[BFD].your_discriminator,
1484                           self.test_session.my_discriminator,
1485                           "BFD - your discriminator")
1486         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1487         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1488                                  state=BFDState.up)
1489         self.logger.info("BFD: Waiting for event")
1490         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1491         verify_event(self, e, expected_state=BFDState.init)
1492         self.logger.info("BFD: Sending Up")
1493         self.test_session.send_packet()
1494         self.logger.info("BFD: Waiting for event")
1495         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1496         verify_event(self, e, expected_state=BFDState.up)
1497         self.logger.info("BFD: Session is Up")
1498         self.test_session.update(state=BFDState.up)
1499         self.test_session.send_packet()
1500         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1501
1502     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1503     def test_hold_up(self):
1504         """ hold BFD session up """
1505         bfd_session_up(self)
1506         for dummy in range(self.test_session.detect_mult * 2):
1507             wait_for_bfd_packet(self)
1508             self.test_session.send_packet()
1509         self.assert_equal(len(self.vapi.collect_events()), 0,
1510                           "number of bfd events")
1511         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1512
1513     def test_echo_looped_back(self):
1514         """ echo packets looped back """
1515         # don't need a session in this case..
1516         self.vpp_session.remove_vpp_config()
1517         self.pg0.enable_capture()
1518         echo_packet_count = 10
1519         # random source port low enough to increment a few times..
1520         udp_sport_tx = randint(1, 50000)
1521         udp_sport_rx = udp_sport_tx
1522         echo_packet = (Ether(src=self.pg0.remote_mac,
1523                              dst=self.pg0.local_mac) /
1524                        IPv6(src=self.pg0.remote_ip6,
1525                             dst=self.pg0.remote_ip6) /
1526                        UDP(dport=BFD.udp_dport_echo) /
1527                        Raw("this should be looped back"))
1528         for dummy in range(echo_packet_count):
1529             self.sleep(.01, "delay between echo packets")
1530             echo_packet[UDP].sport = udp_sport_tx
1531             udp_sport_tx += 1
1532             self.logger.debug(ppp("Sending packet:", echo_packet))
1533             self.pg0.add_stream(echo_packet)
1534             self.pg_start()
1535         for dummy in range(echo_packet_count):
1536             p = self.pg0.wait_for_packet(1)
1537             self.logger.debug(ppp("Got packet:", p))
1538             ether = p[Ether]
1539             self.assert_equal(self.pg0.remote_mac,
1540                               ether.dst, "Destination MAC")
1541             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1542             ip = p[IPv6]
1543             self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1544             self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1545             udp = p[UDP]
1546             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1547                               "UDP destination port")
1548             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1549             udp_sport_rx += 1
1550             # need to compare the hex payload here, otherwise BFD_vpp_echo
1551             # gets in way
1552             self.assertEqual(str(p[UDP].payload),
1553                              str(echo_packet[UDP].payload),
1554                              "Received packet is not the echo packet sent")
1555         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1556                           "ECHO packet identifier for test purposes)")
1557         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1558                           "ECHO packet identifier for test purposes)")
1559
1560     def test_echo(self):
1561         """ echo function """
1562         bfd_session_up(self)
1563         self.test_session.update(required_min_echo_rx=150000)
1564         self.test_session.send_packet()
1565         detection_time = self.test_session.detect_mult *\
1566             self.vpp_session.required_min_rx / USEC_IN_SEC
1567         # echo shouldn't work without echo source set
1568         for dummy in range(10):
1569             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1570             self.sleep(sleep, "delay before sending bfd packet")
1571             self.test_session.send_packet()
1572         p = wait_for_bfd_packet(
1573             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1574         self.assert_equal(p[BFD].required_min_rx_interval,
1575                           self.vpp_session.required_min_rx,
1576                           "BFD required min rx interval")
1577         self.test_session.send_packet()
1578         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1579         echo_seen = False
1580         # should be turned on - loopback echo packets
1581         for dummy in range(3):
1582             loop_until = time.time() + 0.75 * detection_time
1583             while time.time() < loop_until:
1584                 p = self.pg0.wait_for_packet(1)
1585                 self.logger.debug(ppp("Got packet:", p))
1586                 if p[UDP].dport == BFD.udp_dport_echo:
1587                     self.assert_equal(
1588                         p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1589                     self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1590                                         "BFD ECHO src IP equal to loopback IP")
1591                     self.logger.debug(ppp("Looping back packet:", p))
1592                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1593                                       "ECHO packet destination MAC address")
1594                     p[Ether].dst = self.pg0.local_mac
1595                     self.pg0.add_stream(p)
1596                     self.pg_start()
1597                     echo_seen = True
1598                 elif p.haslayer(BFD):
1599                     if echo_seen:
1600                         self.assertGreaterEqual(
1601                             p[BFD].required_min_rx_interval,
1602                             1000000)
1603                     if "P" in p.sprintf("%BFD.flags%"):
1604                         final = self.test_session.create_packet()
1605                         final[BFD].flags = "F"
1606                         self.test_session.send_packet(final)
1607                 else:
1608                     raise Exception(ppp("Received unknown packet:", p))
1609
1610                 self.assert_equal(len(self.vapi.collect_events()), 0,
1611                                   "number of bfd events")
1612             self.test_session.send_packet()
1613         self.assertTrue(echo_seen, "No echo packets received")
1614
1615     def test_intf_deleted(self):
1616         """ interface with bfd session deleted """
1617         intf = VppLoInterface(self, 0)
1618         intf.config_ip6()
1619         intf.admin_up()
1620         sw_if_index = intf.sw_if_index
1621         vpp_session = VppBFDUDPSession(
1622             self, intf, intf.remote_ip6, af=AF_INET6)
1623         vpp_session.add_vpp_config()
1624         vpp_session.admin_up()
1625         intf.remove_vpp_config()
1626         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1627         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1628         self.assertFalse(vpp_session.query_vpp_config())
1629
1630
1631 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1632 class BFDFIBTestCase(VppTestCase):
1633     """ BFD-FIB interactions (IPv6) """
1634
1635     vpp_session = None
1636     test_session = None
1637
1638     def setUp(self):
1639         super(BFDFIBTestCase, self).setUp()
1640         self.create_pg_interfaces(range(1))
1641
1642         self.vapi.want_bfd_events()
1643         self.pg0.enable_capture()
1644
1645         for i in self.pg_interfaces:
1646             i.admin_up()
1647             i.config_ip6()
1648             i.configure_ipv6_neighbors()
1649
1650     def tearDown(self):
1651         if not self.vpp_dead:
1652             self.vapi.want_bfd_events(enable_disable=0)
1653
1654         super(BFDFIBTestCase, self).tearDown()
1655
1656     @staticmethod
1657     def pkt_is_not_data_traffic(p):
1658         """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1659         if p.haslayer(BFD) or is_ipv6_misc(p):
1660             return True
1661         return False
1662
1663     def test_session_with_fib(self):
1664         """ BFD-FIB interactions """
1665
1666         # packets to match against both of the routes
1667         p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1668               IPv6(src="3001::1", dst="2001::1") /
1669               UDP(sport=1234, dport=1234) /
1670               Raw('\xa5' * 100)),
1671              (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1672               IPv6(src="3001::1", dst="2002::1") /
1673               UDP(sport=1234, dport=1234) /
1674               Raw('\xa5' * 100))]
1675
1676         # A recursive and a non-recursive route via a next-hop that
1677         # will have a BFD session
1678         ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
1679                                   [VppRoutePath(self.pg0.remote_ip6,
1680                                                 self.pg0.sw_if_index,
1681                                                 proto=DPO_PROTO_IP6)],
1682                                   is_ip6=1)
1683         ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
1684                                   [VppRoutePath(self.pg0.remote_ip6,
1685                                                 0xffffffff,
1686                                                 proto=DPO_PROTO_IP6)],
1687                                   is_ip6=1)
1688         ip_2001_s_64.add_vpp_config()
1689         ip_2002_s_64.add_vpp_config()
1690
1691         # bring the session up now the routes are present
1692         self.vpp_session = VppBFDUDPSession(self,
1693                                             self.pg0,
1694                                             self.pg0.remote_ip6,
1695                                             af=AF_INET6)
1696         self.vpp_session.add_vpp_config()
1697         self.vpp_session.admin_up()
1698         self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1699
1700         # session is up - traffic passes
1701         bfd_session_up(self)
1702
1703         self.pg0.add_stream(p)
1704         self.pg_start()
1705         for packet in p:
1706             captured = self.pg0.wait_for_packet(
1707                 1,
1708                 filter_out_fn=self.pkt_is_not_data_traffic)
1709             self.assertEqual(captured[IPv6].dst,
1710                              packet[IPv6].dst)
1711
1712         # session is up - traffic is dropped
1713         bfd_session_down(self)
1714
1715         self.pg0.add_stream(p)
1716         self.pg_start()
1717         with self.assertRaises(CaptureTimeoutError):
1718             self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
1719
1720         # session is up - traffic passes
1721         bfd_session_up(self)
1722
1723         self.pg0.add_stream(p)
1724         self.pg_start()
1725         for packet in p:
1726             captured = self.pg0.wait_for_packet(
1727                 1,
1728                 filter_out_fn=self.pkt_is_not_data_traffic)
1729             self.assertEqual(captured[IPv6].dst,
1730                              packet[IPv6].dst)
1731
1732
1733 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1734 class BFDSHA1TestCase(VppTestCase):
1735     """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1736
1737     pg0 = None
1738     vpp_clock_offset = None
1739     vpp_session = None
1740     test_session = None
1741
1742     @classmethod
1743     def setUpClass(cls):
1744         super(BFDSHA1TestCase, cls).setUpClass()
1745         try:
1746             cls.create_pg_interfaces([0])
1747             cls.pg0.config_ip4()
1748             cls.pg0.admin_up()
1749             cls.pg0.resolve_arp()
1750
1751         except Exception:
1752             super(BFDSHA1TestCase, cls).tearDownClass()
1753             raise
1754
1755     def setUp(self):
1756         super(BFDSHA1TestCase, self).setUp()
1757         self.factory = AuthKeyFactory()
1758         self.vapi.want_bfd_events()
1759         self.pg0.enable_capture()
1760
1761     def tearDown(self):
1762         if not self.vpp_dead:
1763             self.vapi.want_bfd_events(enable_disable=0)
1764         self.vapi.collect_events()  # clear the event queue
1765         super(BFDSHA1TestCase, self).tearDown()
1766
1767     def test_session_up(self):
1768         """ bring BFD session up """
1769         key = self.factory.create_random_key(self)
1770         key.add_vpp_config()
1771         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1772                                             self.pg0.remote_ip4,
1773                                             sha1_key=key)
1774         self.vpp_session.add_vpp_config()
1775         self.vpp_session.admin_up()
1776         self.test_session = BFDTestSession(
1777             self, self.pg0, AF_INET, sha1_key=key,
1778             bfd_key_id=self.vpp_session.bfd_key_id)
1779         bfd_session_up(self)
1780
1781     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1782     def test_hold_up(self):
1783         """ hold BFD session up """
1784         key = self.factory.create_random_key(self)
1785         key.add_vpp_config()
1786         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1787                                             self.pg0.remote_ip4,
1788                                             sha1_key=key)
1789         self.vpp_session.add_vpp_config()
1790         self.vpp_session.admin_up()
1791         self.test_session = BFDTestSession(
1792             self, self.pg0, AF_INET, sha1_key=key,
1793             bfd_key_id=self.vpp_session.bfd_key_id)
1794         bfd_session_up(self)
1795         for dummy in range(self.test_session.detect_mult * 2):
1796             wait_for_bfd_packet(self)
1797             self.test_session.send_packet()
1798         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1799
1800     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1801     def test_hold_up_meticulous(self):
1802         """ hold BFD session up - meticulous auth """
1803         key = self.factory.create_random_key(
1804             self, BFDAuthType.meticulous_keyed_sha1)
1805         key.add_vpp_config()
1806         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1807                                             self.pg0.remote_ip4, sha1_key=key)
1808         self.vpp_session.add_vpp_config()
1809         self.vpp_session.admin_up()
1810         # specify sequence number so that it wraps
1811         self.test_session = BFDTestSession(
1812             self, self.pg0, AF_INET, sha1_key=key,
1813             bfd_key_id=self.vpp_session.bfd_key_id,
1814             our_seq_number=0xFFFFFFFF - 4)
1815         bfd_session_up(self)
1816         for dummy in range(30):
1817             wait_for_bfd_packet(self)
1818             self.test_session.inc_seq_num()
1819             self.test_session.send_packet()
1820         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1821
1822     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1823     def test_send_bad_seq_number(self):
1824         """ session is not kept alive by msgs with bad sequence numbers"""
1825         key = self.factory.create_random_key(
1826             self, BFDAuthType.meticulous_keyed_sha1)
1827         key.add_vpp_config()
1828         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1829                                             self.pg0.remote_ip4, sha1_key=key)
1830         self.vpp_session.add_vpp_config()
1831         self.test_session = BFDTestSession(
1832             self, self.pg0, AF_INET, sha1_key=key,
1833             bfd_key_id=self.vpp_session.bfd_key_id)
1834         bfd_session_up(self)
1835         detection_time = self.test_session.detect_mult *\
1836             self.vpp_session.required_min_rx / USEC_IN_SEC
1837         send_until = time.time() + 2 * detection_time
1838         while time.time() < send_until:
1839             self.test_session.send_packet()
1840             self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
1841                        "time between bfd packets")
1842         e = self.vapi.collect_events()
1843         # session should be down now, because the sequence numbers weren't
1844         # updated
1845         self.assert_equal(len(e), 1, "number of bfd events")
1846         verify_event(self, e[0], expected_state=BFDState.down)
1847
1848     def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
1849                                        legitimate_test_session,
1850                                        rogue_test_session,
1851                                        rogue_bfd_values=None):
1852         """ execute a rogue session interaction scenario
1853
1854         1. create vpp session, add config
1855         2. bring the legitimate session up
1856         3. copy the bfd values from legitimate session to rogue session
1857         4. apply rogue_bfd_values to rogue session
1858         5. set rogue session state to down
1859         6. send message to take the session down from the rogue session
1860         7. assert that the legitimate session is unaffected
1861         """
1862
1863         self.vpp_session = vpp_bfd_udp_session
1864         self.vpp_session.add_vpp_config()
1865         self.test_session = legitimate_test_session
1866         # bring vpp session up
1867         bfd_session_up(self)
1868         # send packet from rogue session
1869         rogue_test_session.update(
1870             my_discriminator=self.test_session.my_discriminator,
1871             your_discriminator=self.test_session.your_discriminator,
1872             desired_min_tx=self.test_session.desired_min_tx,
1873             required_min_rx=self.test_session.required_min_rx,
1874             detect_mult=self.test_session.detect_mult,
1875             diag=self.test_session.diag,
1876             state=self.test_session.state,
1877             auth_type=self.test_session.auth_type)
1878         if rogue_bfd_values:
1879             rogue_test_session.update(**rogue_bfd_values)
1880         rogue_test_session.update(state=BFDState.down)
1881         rogue_test_session.send_packet()
1882         wait_for_bfd_packet(self)
1883         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1884
1885     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1886     def test_mismatch_auth(self):
1887         """ session is not brought down by unauthenticated msg """
1888         key = self.factory.create_random_key(self)
1889         key.add_vpp_config()
1890         vpp_session = VppBFDUDPSession(
1891             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1892         legitimate_test_session = BFDTestSession(
1893             self, self.pg0, AF_INET, sha1_key=key,
1894             bfd_key_id=vpp_session.bfd_key_id)
1895         rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
1896         self.execute_rogue_session_scenario(vpp_session,
1897                                             legitimate_test_session,
1898                                             rogue_test_session)
1899
1900     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1901     def test_mismatch_bfd_key_id(self):
1902         """ session is not brought down by msg with non-existent key-id """
1903         key = self.factory.create_random_key(self)
1904         key.add_vpp_config()
1905         vpp_session = VppBFDUDPSession(
1906             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1907         # pick a different random bfd key id
1908         x = randint(0, 255)
1909         while x == vpp_session.bfd_key_id:
1910             x = randint(0, 255)
1911         legitimate_test_session = BFDTestSession(
1912             self, self.pg0, AF_INET, sha1_key=key,
1913             bfd_key_id=vpp_session.bfd_key_id)
1914         rogue_test_session = BFDTestSession(
1915             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
1916         self.execute_rogue_session_scenario(vpp_session,
1917                                             legitimate_test_session,
1918                                             rogue_test_session)
1919
1920     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1921     def test_mismatched_auth_type(self):
1922         """ session is not brought down by msg with wrong auth type """
1923         key = self.factory.create_random_key(self)
1924         key.add_vpp_config()
1925         vpp_session = VppBFDUDPSession(
1926             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1927         legitimate_test_session = BFDTestSession(
1928             self, self.pg0, AF_INET, sha1_key=key,
1929             bfd_key_id=vpp_session.bfd_key_id)
1930         rogue_test_session = BFDTestSession(
1931             self, self.pg0, AF_INET, sha1_key=key,
1932             bfd_key_id=vpp_session.bfd_key_id)
1933         self.execute_rogue_session_scenario(
1934             vpp_session, legitimate_test_session, rogue_test_session,
1935             {'auth_type': BFDAuthType.keyed_md5})
1936
1937     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1938     def test_restart(self):
1939         """ simulate remote peer restart and resynchronization """
1940         key = self.factory.create_random_key(
1941             self, BFDAuthType.meticulous_keyed_sha1)
1942         key.add_vpp_config()
1943         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1944                                             self.pg0.remote_ip4, sha1_key=key)
1945         self.vpp_session.add_vpp_config()
1946         self.test_session = BFDTestSession(
1947             self, self.pg0, AF_INET, sha1_key=key,
1948             bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
1949         bfd_session_up(self)
1950         # don't send any packets for 2*detection_time
1951         detection_time = self.test_session.detect_mult *\
1952             self.vpp_session.required_min_rx / USEC_IN_SEC
1953         self.sleep(2 * detection_time, "simulating peer restart")
1954         events = self.vapi.collect_events()
1955         self.assert_equal(len(events), 1, "number of bfd events")
1956         verify_event(self, events[0], expected_state=BFDState.down)
1957         self.test_session.update(state=BFDState.down)
1958         # reset sequence number
1959         self.test_session.our_seq_number = 0
1960         self.test_session.vpp_seq_number = None
1961         # now throw away any pending packets
1962         self.pg0.enable_capture()
1963         bfd_session_up(self)
1964
1965
1966 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1967 class BFDAuthOnOffTestCase(VppTestCase):
1968     """Bidirectional Forwarding Detection (BFD) (changing auth) """
1969
1970     pg0 = None
1971     vpp_session = None
1972     test_session = None
1973
1974     @classmethod
1975     def setUpClass(cls):
1976         super(BFDAuthOnOffTestCase, cls).setUpClass()
1977         try:
1978             cls.create_pg_interfaces([0])
1979             cls.pg0.config_ip4()
1980             cls.pg0.admin_up()
1981             cls.pg0.resolve_arp()
1982
1983         except Exception:
1984             super(BFDAuthOnOffTestCase, cls).tearDownClass()
1985             raise
1986
1987     def setUp(self):
1988         super(BFDAuthOnOffTestCase, self).setUp()
1989         self.factory = AuthKeyFactory()
1990         self.vapi.want_bfd_events()
1991         self.pg0.enable_capture()
1992
1993     def tearDown(self):
1994         if not self.vpp_dead:
1995             self.vapi.want_bfd_events(enable_disable=0)
1996         self.vapi.collect_events()  # clear the event queue
1997         super(BFDAuthOnOffTestCase, self).tearDown()
1998
1999     def test_auth_on_immediate(self):
2000         """ turn auth on without disturbing session state (immediate) """
2001         key = self.factory.create_random_key(self)
2002         key.add_vpp_config()
2003         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2004                                             self.pg0.remote_ip4)
2005         self.vpp_session.add_vpp_config()
2006         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2007         bfd_session_up(self)
2008         for dummy in range(self.test_session.detect_mult * 2):
2009             p = wait_for_bfd_packet(self)
2010             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2011             self.test_session.send_packet()
2012         self.vpp_session.activate_auth(key)
2013         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2014         self.test_session.sha1_key = key
2015         for dummy in range(self.test_session.detect_mult * 2):
2016             p = wait_for_bfd_packet(self)
2017             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2018             self.test_session.send_packet()
2019         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2020         self.assert_equal(len(self.vapi.collect_events()), 0,
2021                           "number of bfd events")
2022
2023     def test_auth_off_immediate(self):
2024         """ turn auth off without disturbing session state (immediate) """
2025         key = self.factory.create_random_key(self)
2026         key.add_vpp_config()
2027         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2028                                             self.pg0.remote_ip4, sha1_key=key)
2029         self.vpp_session.add_vpp_config()
2030         self.test_session = BFDTestSession(
2031             self, self.pg0, AF_INET, sha1_key=key,
2032             bfd_key_id=self.vpp_session.bfd_key_id)
2033         bfd_session_up(self)
2034         # self.vapi.want_bfd_events(enable_disable=0)
2035         for dummy in range(self.test_session.detect_mult * 2):
2036             p = wait_for_bfd_packet(self)
2037             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2038             self.test_session.inc_seq_num()
2039             self.test_session.send_packet()
2040         self.vpp_session.deactivate_auth()
2041         self.test_session.bfd_key_id = None
2042         self.test_session.sha1_key = None
2043         for dummy in range(self.test_session.detect_mult * 2):
2044             p = wait_for_bfd_packet(self)
2045             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2046             self.test_session.inc_seq_num()
2047             self.test_session.send_packet()
2048         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2049         self.assert_equal(len(self.vapi.collect_events()), 0,
2050                           "number of bfd events")
2051
2052     def test_auth_change_key_immediate(self):
2053         """ change auth key without disturbing session state (immediate) """
2054         key1 = self.factory.create_random_key(self)
2055         key1.add_vpp_config()
2056         key2 = self.factory.create_random_key(self)
2057         key2.add_vpp_config()
2058         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2059                                             self.pg0.remote_ip4, sha1_key=key1)
2060         self.vpp_session.add_vpp_config()
2061         self.test_session = BFDTestSession(
2062             self, self.pg0, AF_INET, sha1_key=key1,
2063             bfd_key_id=self.vpp_session.bfd_key_id)
2064         bfd_session_up(self)
2065         for dummy in range(self.test_session.detect_mult * 2):
2066             p = wait_for_bfd_packet(self)
2067             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2068             self.test_session.send_packet()
2069         self.vpp_session.activate_auth(key2)
2070         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2071         self.test_session.sha1_key = key2
2072         for dummy in range(self.test_session.detect_mult * 2):
2073             p = wait_for_bfd_packet(self)
2074             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2075             self.test_session.send_packet()
2076         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2077         self.assert_equal(len(self.vapi.collect_events()), 0,
2078                           "number of bfd events")
2079
2080     def test_auth_on_delayed(self):
2081         """ turn auth on without disturbing session state (delayed) """
2082         key = self.factory.create_random_key(self)
2083         key.add_vpp_config()
2084         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2085                                             self.pg0.remote_ip4)
2086         self.vpp_session.add_vpp_config()
2087         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2088         bfd_session_up(self)
2089         for dummy in range(self.test_session.detect_mult * 2):
2090             wait_for_bfd_packet(self)
2091             self.test_session.send_packet()
2092         self.vpp_session.activate_auth(key, delayed=True)
2093         for dummy in range(self.test_session.detect_mult * 2):
2094             p = wait_for_bfd_packet(self)
2095             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2096             self.test_session.send_packet()
2097         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2098         self.test_session.sha1_key = key
2099         self.test_session.send_packet()
2100         for dummy in range(self.test_session.detect_mult * 2):
2101             p = wait_for_bfd_packet(self)
2102             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2103             self.test_session.send_packet()
2104         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2105         self.assert_equal(len(self.vapi.collect_events()), 0,
2106                           "number of bfd events")
2107
2108     def test_auth_off_delayed(self):
2109         """ turn auth off without disturbing session state (delayed) """
2110         key = self.factory.create_random_key(self)
2111         key.add_vpp_config()
2112         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2113                                             self.pg0.remote_ip4, sha1_key=key)
2114         self.vpp_session.add_vpp_config()
2115         self.test_session = BFDTestSession(
2116             self, self.pg0, AF_INET, sha1_key=key,
2117             bfd_key_id=self.vpp_session.bfd_key_id)
2118         bfd_session_up(self)
2119         for dummy in range(self.test_session.detect_mult * 2):
2120             p = wait_for_bfd_packet(self)
2121             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2122             self.test_session.send_packet()
2123         self.vpp_session.deactivate_auth(delayed=True)
2124         for dummy in range(self.test_session.detect_mult * 2):
2125             p = wait_for_bfd_packet(self)
2126             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2127             self.test_session.send_packet()
2128         self.test_session.bfd_key_id = None
2129         self.test_session.sha1_key = None
2130         self.test_session.send_packet()
2131         for dummy in range(self.test_session.detect_mult * 2):
2132             p = wait_for_bfd_packet(self)
2133             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2134             self.test_session.send_packet()
2135         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2136         self.assert_equal(len(self.vapi.collect_events()), 0,
2137                           "number of bfd events")
2138
2139     def test_auth_change_key_delayed(self):
2140         """ change auth key without disturbing session state (delayed) """
2141         key1 = self.factory.create_random_key(self)
2142         key1.add_vpp_config()
2143         key2 = self.factory.create_random_key(self)
2144         key2.add_vpp_config()
2145         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2146                                             self.pg0.remote_ip4, sha1_key=key1)
2147         self.vpp_session.add_vpp_config()
2148         self.vpp_session.admin_up()
2149         self.test_session = BFDTestSession(
2150             self, self.pg0, AF_INET, sha1_key=key1,
2151             bfd_key_id=self.vpp_session.bfd_key_id)
2152         bfd_session_up(self)
2153         for dummy in range(self.test_session.detect_mult * 2):
2154             p = wait_for_bfd_packet(self)
2155             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2156             self.test_session.send_packet()
2157         self.vpp_session.activate_auth(key2, delayed=True)
2158         for dummy in range(self.test_session.detect_mult * 2):
2159             p = wait_for_bfd_packet(self)
2160             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2161             self.test_session.send_packet()
2162         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2163         self.test_session.sha1_key = key2
2164         self.test_session.send_packet()
2165         for dummy in range(self.test_session.detect_mult * 2):
2166             p = wait_for_bfd_packet(self)
2167             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2168             self.test_session.send_packet()
2169         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2170         self.assert_equal(len(self.vapi.collect_events()), 0,
2171                           "number of bfd events")
2172
2173
2174 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2175 class BFDCLITestCase(VppTestCase):
2176     """Bidirectional Forwarding Detection (BFD) (CLI) """
2177     pg0 = None
2178
2179     @classmethod
2180     def setUpClass(cls):
2181         super(BFDCLITestCase, cls).setUpClass()
2182
2183         try:
2184             cls.create_pg_interfaces((0,))
2185             cls.pg0.config_ip4()
2186             cls.pg0.config_ip6()
2187             cls.pg0.resolve_arp()
2188             cls.pg0.resolve_ndp()
2189
2190         except Exception:
2191             super(BFDCLITestCase, cls).tearDownClass()
2192             raise
2193
2194     def setUp(self):
2195         super(BFDCLITestCase, self).setUp()
2196         self.factory = AuthKeyFactory()
2197         self.pg0.enable_capture()
2198
2199     def tearDown(self):
2200         try:
2201             self.vapi.want_bfd_events(enable_disable=0)
2202         except UnexpectedApiReturnValueError:
2203             # some tests aren't subscribed, so this is not an issue
2204             pass
2205         self.vapi.collect_events()  # clear the event queue
2206         super(BFDCLITestCase, self).tearDown()
2207
2208     def cli_verify_no_response(self, cli):
2209         """ execute a CLI, asserting that the response is empty """
2210         self.assert_equal(self.vapi.cli(cli),
2211                           "",
2212                           "CLI command response")
2213
2214     def cli_verify_response(self, cli, expected):
2215         """ execute a CLI, asserting that the response matches expectation """
2216         self.assert_equal(self.vapi.cli(cli).strip(),
2217                           expected,
2218                           "CLI command response")
2219
2220     def test_show(self):
2221         """ show commands """
2222         k1 = self.factory.create_random_key(self)
2223         k1.add_vpp_config()
2224         k2 = self.factory.create_random_key(
2225             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2226         k2.add_vpp_config()
2227         s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2228         s1.add_vpp_config()
2229         s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2230                               sha1_key=k2)
2231         s2.add_vpp_config()
2232         self.logger.info(self.vapi.ppcli("show bfd keys"))
2233         self.logger.info(self.vapi.ppcli("show bfd sessions"))
2234         self.logger.info(self.vapi.ppcli("show bfd"))
2235
2236     def test_set_del_sha1_key(self):
2237         """ set/delete SHA1 auth key """
2238         k = self.factory.create_random_key(self)
2239         self.registry.register(k, self.logger)
2240         self.cli_verify_no_response(
2241             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2242             (k.conf_key_id,
2243                 "".join("{:02x}".format(ord(c)) for c in k.key)))
2244         self.assertTrue(k.query_vpp_config())
2245         self.vpp_session = VppBFDUDPSession(
2246             self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2247         self.vpp_session.add_vpp_config()
2248         self.test_session = \
2249             BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2250                            bfd_key_id=self.vpp_session.bfd_key_id)
2251         self.vapi.want_bfd_events()
2252         bfd_session_up(self)
2253         bfd_session_down(self)
2254         # try to replace the secret for the key - should fail because the key
2255         # is in-use
2256         k2 = self.factory.create_random_key(self)
2257         self.cli_verify_response(
2258             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2259             (k.conf_key_id,
2260                 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2261             "bfd key set: `bfd_auth_set_key' API call failed, "
2262             "rv=-103:BFD object in use")
2263         # manipulating the session using old secret should still work
2264         bfd_session_up(self)
2265         bfd_session_down(self)
2266         self.vpp_session.remove_vpp_config()
2267         self.cli_verify_no_response(
2268             "bfd key del conf-key-id %s" % k.conf_key_id)
2269         self.assertFalse(k.query_vpp_config())
2270
2271     def test_set_del_meticulous_sha1_key(self):
2272         """ set/delete meticulous SHA1 auth key """
2273         k = self.factory.create_random_key(
2274             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2275         self.registry.register(k, self.logger)
2276         self.cli_verify_no_response(
2277             "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2278             (k.conf_key_id,
2279                 "".join("{:02x}".format(ord(c)) for c in k.key)))
2280         self.assertTrue(k.query_vpp_config())
2281         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2282                                             self.pg0.remote_ip6, af=AF_INET6,
2283                                             sha1_key=k)
2284         self.vpp_session.add_vpp_config()
2285         self.vpp_session.admin_up()
2286         self.test_session = \
2287             BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2288                            bfd_key_id=self.vpp_session.bfd_key_id)
2289         self.vapi.want_bfd_events()
2290         bfd_session_up(self)
2291         bfd_session_down(self)
2292         # try to replace the secret for the key - should fail because the key
2293         # is in-use
2294         k2 = self.factory.create_random_key(self)
2295         self.cli_verify_response(
2296             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2297             (k.conf_key_id,
2298                 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2299             "bfd key set: `bfd_auth_set_key' API call failed, "
2300             "rv=-103:BFD object in use")
2301         # manipulating the session using old secret should still work
2302         bfd_session_up(self)
2303         bfd_session_down(self)
2304         self.vpp_session.remove_vpp_config()
2305         self.cli_verify_no_response(
2306             "bfd key del conf-key-id %s" % k.conf_key_id)
2307         self.assertFalse(k.query_vpp_config())
2308
2309     def test_add_mod_del_bfd_udp(self):
2310         """ create/modify/delete IPv4 BFD UDP session """
2311         vpp_session = VppBFDUDPSession(
2312             self, self.pg0, self.pg0.remote_ip4)
2313         self.registry.register(vpp_session, self.logger)
2314         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2315             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2316             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2317                                 self.pg0.remote_ip4,
2318                                 vpp_session.desired_min_tx,
2319                                 vpp_session.required_min_rx,
2320                                 vpp_session.detect_mult)
2321         self.cli_verify_no_response(cli_add_cmd)
2322         # 2nd add should fail
2323         self.cli_verify_response(
2324             cli_add_cmd,
2325             "bfd udp session add: `bfd_add_add_session' API call"
2326             " failed, rv=-101:Duplicate BFD object")
2327         verify_bfd_session_config(self, vpp_session)
2328         mod_session = VppBFDUDPSession(
2329             self, self.pg0, self.pg0.remote_ip4,
2330             required_min_rx=2 * vpp_session.required_min_rx,
2331             desired_min_tx=3 * vpp_session.desired_min_tx,
2332             detect_mult=4 * vpp_session.detect_mult)
2333         self.cli_verify_no_response(
2334             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2335             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2336             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2337              mod_session.desired_min_tx, mod_session.required_min_rx,
2338              mod_session.detect_mult))
2339         verify_bfd_session_config(self, mod_session)
2340         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2341             "peer-addr %s" % (self.pg0.name,
2342                               self.pg0.local_ip4, self.pg0.remote_ip4)
2343         self.cli_verify_no_response(cli_del_cmd)
2344         # 2nd del is expected to fail
2345         self.cli_verify_response(
2346             cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2347             " failed, rv=-102:No such BFD object")
2348         self.assertFalse(vpp_session.query_vpp_config())
2349
2350     def test_add_mod_del_bfd_udp6(self):
2351         """ create/modify/delete IPv6 BFD UDP session """
2352         vpp_session = VppBFDUDPSession(
2353             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2354         self.registry.register(vpp_session, self.logger)
2355         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2356             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2357             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2358                                 self.pg0.remote_ip6,
2359                                 vpp_session.desired_min_tx,
2360                                 vpp_session.required_min_rx,
2361                                 vpp_session.detect_mult)
2362         self.cli_verify_no_response(cli_add_cmd)
2363         # 2nd add should fail
2364         self.cli_verify_response(
2365             cli_add_cmd,
2366             "bfd udp session add: `bfd_add_add_session' API call"
2367             " failed, rv=-101:Duplicate BFD object")
2368         verify_bfd_session_config(self, vpp_session)
2369         mod_session = VppBFDUDPSession(
2370             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2371             required_min_rx=2 * vpp_session.required_min_rx,
2372             desired_min_tx=3 * vpp_session.desired_min_tx,
2373             detect_mult=4 * vpp_session.detect_mult)
2374         self.cli_verify_no_response(
2375             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2376             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2377             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2378              mod_session.desired_min_tx,
2379              mod_session.required_min_rx, mod_session.detect_mult))
2380         verify_bfd_session_config(self, mod_session)
2381         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2382             "peer-addr %s" % (self.pg0.name,
2383                               self.pg0.local_ip6, self.pg0.remote_ip6)
2384         self.cli_verify_no_response(cli_del_cmd)
2385         # 2nd del is expected to fail
2386         self.cli_verify_response(
2387             cli_del_cmd,
2388             "bfd udp session del: `bfd_udp_del_session' API call"
2389             " failed, rv=-102:No such BFD object")
2390         self.assertFalse(vpp_session.query_vpp_config())
2391
2392     def test_add_mod_del_bfd_udp_auth(self):
2393         """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2394         key = self.factory.create_random_key(self)
2395         key.add_vpp_config()
2396         vpp_session = VppBFDUDPSession(
2397             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2398         self.registry.register(vpp_session, self.logger)
2399         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2400             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2401             "detect-mult %s conf-key-id %s bfd-key-id %s"\
2402             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2403                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2404                vpp_session.detect_mult, key.conf_key_id,
2405                vpp_session.bfd_key_id)
2406         self.cli_verify_no_response(cli_add_cmd)
2407         # 2nd add should fail
2408         self.cli_verify_response(
2409             cli_add_cmd,
2410             "bfd udp session add: `bfd_add_add_session' API call"
2411             " failed, rv=-101:Duplicate BFD object")
2412         verify_bfd_session_config(self, vpp_session)
2413         mod_session = VppBFDUDPSession(
2414             self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2415             bfd_key_id=vpp_session.bfd_key_id,
2416             required_min_rx=2 * vpp_session.required_min_rx,
2417             desired_min_tx=3 * vpp_session.desired_min_tx,
2418             detect_mult=4 * vpp_session.detect_mult)
2419         self.cli_verify_no_response(
2420             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2421             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2422             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2423              mod_session.desired_min_tx,
2424              mod_session.required_min_rx, mod_session.detect_mult))
2425         verify_bfd_session_config(self, mod_session)
2426         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2427             "peer-addr %s" % (self.pg0.name,
2428                               self.pg0.local_ip4, self.pg0.remote_ip4)
2429         self.cli_verify_no_response(cli_del_cmd)
2430         # 2nd del is expected to fail
2431         self.cli_verify_response(
2432             cli_del_cmd,
2433             "bfd udp session del: `bfd_udp_del_session' API call"
2434             " failed, rv=-102:No such BFD object")
2435         self.assertFalse(vpp_session.query_vpp_config())
2436
2437     def test_add_mod_del_bfd_udp6_auth(self):
2438         """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2439         key = self.factory.create_random_key(
2440             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2441         key.add_vpp_config()
2442         vpp_session = VppBFDUDPSession(
2443             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2444         self.registry.register(vpp_session, self.logger)
2445         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2446             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2447             "detect-mult %s conf-key-id %s bfd-key-id %s" \
2448             % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2449                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2450                vpp_session.detect_mult, key.conf_key_id,
2451                vpp_session.bfd_key_id)
2452         self.cli_verify_no_response(cli_add_cmd)
2453         # 2nd add should fail
2454         self.cli_verify_response(
2455             cli_add_cmd,
2456             "bfd udp session add: `bfd_add_add_session' API call"
2457             " failed, rv=-101:Duplicate BFD object")
2458         verify_bfd_session_config(self, vpp_session)
2459         mod_session = VppBFDUDPSession(
2460             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2461             bfd_key_id=vpp_session.bfd_key_id,
2462             required_min_rx=2 * vpp_session.required_min_rx,
2463             desired_min_tx=3 * vpp_session.desired_min_tx,
2464             detect_mult=4 * vpp_session.detect_mult)
2465         self.cli_verify_no_response(
2466             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2467             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2468             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2469              mod_session.desired_min_tx,
2470              mod_session.required_min_rx, mod_session.detect_mult))
2471         verify_bfd_session_config(self, mod_session)
2472         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2473             "peer-addr %s" % (self.pg0.name,
2474                               self.pg0.local_ip6, self.pg0.remote_ip6)
2475         self.cli_verify_no_response(cli_del_cmd)
2476         # 2nd del is expected to fail
2477         self.cli_verify_response(
2478             cli_del_cmd,
2479             "bfd udp session del: `bfd_udp_del_session' API call"
2480             " failed, rv=-102:No such BFD object")
2481         self.assertFalse(vpp_session.query_vpp_config())
2482
2483     def test_auth_on_off(self):
2484         """ turn authentication on and off """
2485         key = self.factory.create_random_key(
2486             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2487         key.add_vpp_config()
2488         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2489         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2490                                         sha1_key=key)
2491         session.add_vpp_config()
2492         cli_activate = \
2493             "bfd udp session auth activate interface %s local-addr %s "\
2494             "peer-addr %s conf-key-id %s bfd-key-id %s"\
2495             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2496                key.conf_key_id, auth_session.bfd_key_id)
2497         self.cli_verify_no_response(cli_activate)
2498         verify_bfd_session_config(self, auth_session)
2499         self.cli_verify_no_response(cli_activate)
2500         verify_bfd_session_config(self, auth_session)
2501         cli_deactivate = \
2502             "bfd udp session auth deactivate interface %s local-addr %s "\
2503             "peer-addr %s "\
2504             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2505         self.cli_verify_no_response(cli_deactivate)
2506         verify_bfd_session_config(self, session)
2507         self.cli_verify_no_response(cli_deactivate)
2508         verify_bfd_session_config(self, session)
2509
2510     def test_auth_on_off_delayed(self):
2511         """ turn authentication on and off (delayed) """
2512         key = self.factory.create_random_key(
2513             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2514         key.add_vpp_config()
2515         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2516         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2517                                         sha1_key=key)
2518         session.add_vpp_config()
2519         cli_activate = \
2520             "bfd udp session auth activate interface %s local-addr %s "\
2521             "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2522             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2523                key.conf_key_id, auth_session.bfd_key_id)
2524         self.cli_verify_no_response(cli_activate)
2525         verify_bfd_session_config(self, auth_session)
2526         self.cli_verify_no_response(cli_activate)
2527         verify_bfd_session_config(self, auth_session)
2528         cli_deactivate = \
2529             "bfd udp session auth deactivate interface %s local-addr %s "\
2530             "peer-addr %s delayed yes"\
2531             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2532         self.cli_verify_no_response(cli_deactivate)
2533         verify_bfd_session_config(self, session)
2534         self.cli_verify_no_response(cli_deactivate)
2535         verify_bfd_session_config(self, session)
2536
2537     def test_admin_up_down(self):
2538         """ put session admin-up and admin-down """
2539         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2540         session.add_vpp_config()
2541         cli_down = \
2542             "bfd udp session set-flags admin down interface %s local-addr %s "\
2543             "peer-addr %s "\
2544             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2545         cli_up = \
2546             "bfd udp session set-flags admin up interface %s local-addr %s "\
2547             "peer-addr %s "\
2548             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2549         self.cli_verify_no_response(cli_down)
2550         verify_bfd_session_config(self, session, state=BFDState.admin_down)
2551         self.cli_verify_no_response(cli_up)
2552         verify_bfd_session_config(self, session, state=BFDState.down)
2553
2554     def test_set_del_udp_echo_source(self):
2555         """ set/del udp echo source """
2556         self.create_loopback_interfaces([0])
2557         self.loopback0 = self.lo_interfaces[0]
2558         self.loopback0.admin_up()
2559         self.cli_verify_response("show bfd echo-source",
2560                                  "UDP echo source is not set.")
2561         cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2562         self.cli_verify_no_response(cli_set)
2563         self.cli_verify_response("show bfd echo-source",
2564                                  "UDP echo source is: %s\n"
2565                                  "IPv4 address usable as echo source: none\n"
2566                                  "IPv6 address usable as echo source: none" %
2567                                  self.loopback0.name)
2568         self.loopback0.config_ip4()
2569         unpacked = unpack("!L", self.loopback0.local_ip4n)
2570         echo_ip4 = inet_ntop(AF_INET, pack("!L", unpacked[0] ^ 1))
2571         self.cli_verify_response("show bfd echo-source",
2572                                  "UDP echo source is: %s\n"
2573                                  "IPv4 address usable as echo source: %s\n"
2574                                  "IPv6 address usable as echo source: none" %
2575                                  (self.loopback0.name, echo_ip4))
2576         unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
2577         echo_ip6 = inet_ntop(AF_INET6, pack("!LLLL", unpacked[0], unpacked[1],
2578                                             unpacked[2], unpacked[3] ^ 1))
2579         self.loopback0.config_ip6()
2580         self.cli_verify_response("show bfd echo-source",
2581                                  "UDP echo source is: %s\n"
2582                                  "IPv4 address usable as echo source: %s\n"
2583                                  "IPv6 address usable as echo source: %s" %
2584                                  (self.loopback0.name, echo_ip4, echo_ip6))
2585         cli_del = "bfd udp echo-source del"
2586         self.cli_verify_no_response(cli_del)
2587         self.cli_verify_response("show bfd echo-source",
2588                                  "UDP echo source is not set.")
2589
2590 if __name__ == '__main__':
2591     unittest.main(testRunner=VppTestRunner)