Fix coverity CIDs 161048, 163895
[vpp.git] / test / test_bfd.py
1 #!/usr/bin/env python
2 """ BFD tests """
3
4 from __future__ import division
5 import unittest
6 import hashlib
7 import binascii
8 import time
9 from struct import pack, unpack
10 from random import randint, shuffle, getrandbits
11 from socket import AF_INET, AF_INET6, inet_ntop
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether
14 from scapy.layers.inet import UDP, IP
15 from scapy.layers.inet6 import IPv6
16 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
17     BFDDiagCode, BFDState, BFD_vpp_echo
18 from framework import VppTestCase, VppTestRunner, running_extended_tests
19 from vpp_pg_interface import CaptureTimeoutError
20 from util import ppp
21 from vpp_papi_provider import UnexpectedApiReturnValueError
22
23 USEC_IN_SEC = 1000000
24
25
26 class AuthKeyFactory(object):
27     """Factory class for creating auth keys with unique conf key ID"""
28
29     def __init__(self):
30         self._conf_key_ids = {}
31
32     def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
33         """ create a random key with unique conf key id """
34         conf_key_id = randint(0, 0xFFFFFFFF)
35         while conf_key_id in self._conf_key_ids:
36             conf_key_id = randint(0, 0xFFFFFFFF)
37         self._conf_key_ids[conf_key_id] = 1
38         key = str(bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
39         return VppBFDAuthKey(test=test, auth_type=auth_type,
40                              conf_key_id=conf_key_id, key=key)
41
42
43 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
44 class BFDAPITestCase(VppTestCase):
45     """Bidirectional Forwarding Detection (BFD) - API"""
46
47     pg0 = None
48     pg1 = None
49
50     @classmethod
51     def setUpClass(cls):
52         super(BFDAPITestCase, cls).setUpClass()
53
54         try:
55             cls.create_pg_interfaces(range(2))
56             for i in cls.pg_interfaces:
57                 i.config_ip4()
58                 i.config_ip6()
59                 i.resolve_arp()
60
61         except Exception:
62             super(BFDAPITestCase, cls).tearDownClass()
63             raise
64
65     def setUp(self):
66         super(BFDAPITestCase, self).setUp()
67         self.factory = AuthKeyFactory()
68
69     def test_add_bfd(self):
70         """ create a BFD session """
71         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
72         session.add_vpp_config()
73         self.logger.debug("Session state is %s", session.state)
74         session.remove_vpp_config()
75         session.add_vpp_config()
76         self.logger.debug("Session state is %s", session.state)
77         session.remove_vpp_config()
78
79     def test_double_add(self):
80         """ create the same BFD session twice (negative case) """
81         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
82         session.add_vpp_config()
83
84         with self.vapi.expect_negative_api_retval():
85             session.add_vpp_config()
86
87         session.remove_vpp_config()
88
89     def test_add_bfd6(self):
90         """ create IPv6 BFD session """
91         session = VppBFDUDPSession(
92             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
93         session.add_vpp_config()
94         self.logger.debug("Session state is %s", session.state)
95         session.remove_vpp_config()
96         session.add_vpp_config()
97         self.logger.debug("Session state is %s", session.state)
98         session.remove_vpp_config()
99
100     def test_mod_bfd(self):
101         """ modify BFD session parameters """
102         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
103                                    desired_min_tx=50000,
104                                    required_min_rx=10000,
105                                    detect_mult=1)
106         session.add_vpp_config()
107         s = session.get_bfd_udp_session_dump_entry()
108         self.assert_equal(session.desired_min_tx,
109                           s.desired_min_tx,
110                           "desired min transmit interval")
111         self.assert_equal(session.required_min_rx,
112                           s.required_min_rx,
113                           "required min receive interval")
114         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
115         session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
116                                   required_min_rx=session.required_min_rx * 2,
117                                   detect_mult=session.detect_mult * 2)
118         s = session.get_bfd_udp_session_dump_entry()
119         self.assert_equal(session.desired_min_tx,
120                           s.desired_min_tx,
121                           "desired min transmit interval")
122         self.assert_equal(session.required_min_rx,
123                           s.required_min_rx,
124                           "required min receive interval")
125         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
126
127     def test_add_sha1_keys(self):
128         """ add SHA1 keys """
129         key_count = 10
130         keys = [self.factory.create_random_key(
131             self) for i in range(0, key_count)]
132         for key in keys:
133             self.assertFalse(key.query_vpp_config())
134         for key in keys:
135             key.add_vpp_config()
136         for key in keys:
137             self.assertTrue(key.query_vpp_config())
138         # remove randomly
139         indexes = range(key_count)
140         shuffle(indexes)
141         removed = []
142         for i in indexes:
143             key = keys[i]
144             key.remove_vpp_config()
145             removed.append(i)
146             for j in range(key_count):
147                 key = keys[j]
148                 if j in removed:
149                     self.assertFalse(key.query_vpp_config())
150                 else:
151                     self.assertTrue(key.query_vpp_config())
152         # should be removed now
153         for key in keys:
154             self.assertFalse(key.query_vpp_config())
155         # add back and remove again
156         for key in keys:
157             key.add_vpp_config()
158         for key in keys:
159             self.assertTrue(key.query_vpp_config())
160         for key in keys:
161             key.remove_vpp_config()
162         for key in keys:
163             self.assertFalse(key.query_vpp_config())
164
165     def test_add_bfd_sha1(self):
166         """ create a BFD session (SHA1) """
167         key = self.factory.create_random_key(self)
168         key.add_vpp_config()
169         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
170                                    sha1_key=key)
171         session.add_vpp_config()
172         self.logger.debug("Session state is %s", session.state)
173         session.remove_vpp_config()
174         session.add_vpp_config()
175         self.logger.debug("Session state is %s", session.state)
176         session.remove_vpp_config()
177
178     def test_double_add_sha1(self):
179         """ create the same BFD session twice (negative case) (SHA1) """
180         key = self.factory.create_random_key(self)
181         key.add_vpp_config()
182         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
183                                    sha1_key=key)
184         session.add_vpp_config()
185         with self.assertRaises(Exception):
186             session.add_vpp_config()
187
188     def test_add_auth_nonexistent_key(self):
189         """ create BFD session using non-existent SHA1 (negative case) """
190         session = VppBFDUDPSession(
191             self, self.pg0, self.pg0.remote_ip4,
192             sha1_key=self.factory.create_random_key(self))
193         with self.assertRaises(Exception):
194             session.add_vpp_config()
195
196     def test_shared_sha1_key(self):
197         """ share single SHA1 key between multiple BFD sessions """
198         key = self.factory.create_random_key(self)
199         key.add_vpp_config()
200         sessions = [
201             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
202                              sha1_key=key),
203             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
204                              sha1_key=key, af=AF_INET6),
205             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
206                              sha1_key=key),
207             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
208                              sha1_key=key, af=AF_INET6)]
209         for s in sessions:
210             s.add_vpp_config()
211         removed = 0
212         for s in sessions:
213             e = key.get_bfd_auth_keys_dump_entry()
214             self.assert_equal(e.use_count, len(sessions) - removed,
215                               "Use count for shared key")
216             s.remove_vpp_config()
217             removed += 1
218         e = key.get_bfd_auth_keys_dump_entry()
219         self.assert_equal(e.use_count, len(sessions) - removed,
220                           "Use count for shared key")
221
222     def test_activate_auth(self):
223         """ activate SHA1 authentication """
224         key = self.factory.create_random_key(self)
225         key.add_vpp_config()
226         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
227         session.add_vpp_config()
228         session.activate_auth(key)
229
230     def test_deactivate_auth(self):
231         """ deactivate SHA1 authentication """
232         key = self.factory.create_random_key(self)
233         key.add_vpp_config()
234         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
235         session.add_vpp_config()
236         session.activate_auth(key)
237         session.deactivate_auth()
238
239     def test_change_key(self):
240         """ change SHA1 key """
241         key1 = self.factory.create_random_key(self)
242         key2 = self.factory.create_random_key(self)
243         while key2.conf_key_id == key1.conf_key_id:
244             key2 = self.factory.create_random_key(self)
245         key1.add_vpp_config()
246         key2.add_vpp_config()
247         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
248                                    sha1_key=key1)
249         session.add_vpp_config()
250         session.activate_auth(key2)
251
252
253 class BFDTestSession(object):
254     """ BFD session as seen from test framework side """
255
256     def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
257                  bfd_key_id=None, our_seq_number=None):
258         self.test = test
259         self.af = af
260         self.sha1_key = sha1_key
261         self.bfd_key_id = bfd_key_id
262         self.interface = interface
263         self.udp_sport = randint(49152, 65535)
264         if our_seq_number is None:
265             self.our_seq_number = randint(0, 40000000)
266         else:
267             self.our_seq_number = our_seq_number
268         self.vpp_seq_number = None
269         self.my_discriminator = 0
270         self.desired_min_tx = 100000
271         self.required_min_rx = 100000
272         self.required_min_echo_rx = None
273         self.detect_mult = detect_mult
274         self.diag = BFDDiagCode.no_diagnostic
275         self.your_discriminator = None
276         self.state = BFDState.down
277         self.auth_type = BFDAuthType.no_auth
278
279     def inc_seq_num(self):
280         """ increment sequence number, wrapping if needed """
281         if self.our_seq_number == 0xFFFFFFFF:
282             self.our_seq_number = 0
283         else:
284             self.our_seq_number += 1
285
286     def update(self, my_discriminator=None, your_discriminator=None,
287                desired_min_tx=None, required_min_rx=None,
288                required_min_echo_rx=None, detect_mult=None,
289                diag=None, state=None, auth_type=None):
290         """ update BFD parameters associated with session """
291         if my_discriminator is not None:
292             self.my_discriminator = my_discriminator
293         if your_discriminator is not None:
294             self.your_discriminator = your_discriminator
295         if required_min_rx is not None:
296             self.required_min_rx = required_min_rx
297         if required_min_echo_rx is not None:
298             self.required_min_echo_rx = required_min_echo_rx
299         if desired_min_tx is not None:
300             self.desired_min_tx = desired_min_tx
301         if detect_mult is not None:
302             self.detect_mult = detect_mult
303         if diag is not None:
304             self.diag = diag
305         if state is not None:
306             self.state = state
307         if auth_type is not None:
308             self.auth_type = auth_type
309
310     def fill_packet_fields(self, packet):
311         """ set packet fields with known values in packet """
312         bfd = packet[BFD]
313         if self.my_discriminator:
314             self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
315                                    self.my_discriminator)
316             bfd.my_discriminator = self.my_discriminator
317         if self.your_discriminator:
318             self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
319                                    self.your_discriminator)
320             bfd.your_discriminator = self.your_discriminator
321         if self.required_min_rx:
322             self.test.logger.debug(
323                 "BFD: setting packet.required_min_rx_interval=%s",
324                 self.required_min_rx)
325             bfd.required_min_rx_interval = self.required_min_rx
326         if self.required_min_echo_rx:
327             self.test.logger.debug(
328                 "BFD: setting packet.required_min_echo_rx=%s",
329                 self.required_min_echo_rx)
330             bfd.required_min_echo_rx_interval = self.required_min_echo_rx
331         if self.desired_min_tx:
332             self.test.logger.debug(
333                 "BFD: setting packet.desired_min_tx_interval=%s",
334                 self.desired_min_tx)
335             bfd.desired_min_tx_interval = self.desired_min_tx
336         if self.detect_mult:
337             self.test.logger.debug(
338                 "BFD: setting packet.detect_mult=%s", self.detect_mult)
339             bfd.detect_mult = self.detect_mult
340         if self.diag:
341             self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
342             bfd.diag = self.diag
343         if self.state:
344             self.test.logger.debug("BFD: setting packet.state=%s", self.state)
345             bfd.state = self.state
346         if self.auth_type:
347             # this is used by a negative test-case
348             self.test.logger.debug("BFD: setting packet.auth_type=%s",
349                                    self.auth_type)
350             bfd.auth_type = self.auth_type
351
352     def create_packet(self):
353         """ create a BFD packet, reflecting the current state of session """
354         if self.sha1_key:
355             bfd = BFD(flags="A")
356             bfd.auth_type = self.sha1_key.auth_type
357             bfd.auth_len = BFD.sha1_auth_len
358             bfd.auth_key_id = self.bfd_key_id
359             bfd.auth_seq_num = self.our_seq_number
360             bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
361         else:
362             bfd = BFD()
363         if self.af == AF_INET6:
364             packet = (Ether(src=self.interface.remote_mac,
365                             dst=self.interface.local_mac) /
366                       IPv6(src=self.interface.remote_ip6,
367                            dst=self.interface.local_ip6,
368                            hlim=255) /
369                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
370                       bfd)
371         else:
372             packet = (Ether(src=self.interface.remote_mac,
373                             dst=self.interface.local_mac) /
374                       IP(src=self.interface.remote_ip4,
375                          dst=self.interface.local_ip4,
376                          ttl=255) /
377                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
378                       bfd)
379         self.test.logger.debug("BFD: Creating packet")
380         self.fill_packet_fields(packet)
381         if self.sha1_key:
382             hash_material = str(packet[BFD])[:32] + self.sha1_key.key + \
383                 "\0" * (20 - len(self.sha1_key.key))
384             self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
385                                    hashlib.sha1(hash_material).hexdigest())
386             packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
387         return packet
388
389     def send_packet(self, packet=None, interface=None):
390         """ send packet on interface, creating the packet if needed """
391         if packet is None:
392             packet = self.create_packet()
393         if interface is None:
394             interface = self.test.pg0
395         self.test.logger.debug(ppp("Sending packet:", packet))
396         interface.add_stream(packet)
397         self.test.pg_start()
398
399     def verify_sha1_auth(self, packet):
400         """ Verify correctness of authentication in BFD layer. """
401         bfd = packet[BFD]
402         self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
403         self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
404                                BFDAuthType)
405         self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
406         self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
407         if self.vpp_seq_number is None:
408             self.vpp_seq_number = bfd.auth_seq_num
409             self.test.logger.debug("Received initial sequence number: %s" %
410                                    self.vpp_seq_number)
411         else:
412             recvd_seq_num = bfd.auth_seq_num
413             self.test.logger.debug("Received followup sequence number: %s" %
414                                    recvd_seq_num)
415             if self.vpp_seq_number < 0xffffffff:
416                 if self.sha1_key.auth_type == \
417                         BFDAuthType.meticulous_keyed_sha1:
418                     self.test.assert_equal(recvd_seq_num,
419                                            self.vpp_seq_number + 1,
420                                            "BFD sequence number")
421                 else:
422                     self.test.assert_in_range(recvd_seq_num,
423                                               self.vpp_seq_number,
424                                               self.vpp_seq_number + 1,
425                                               "BFD sequence number")
426             else:
427                 if self.sha1_key.auth_type == \
428                         BFDAuthType.meticulous_keyed_sha1:
429                     self.test.assert_equal(recvd_seq_num, 0,
430                                            "BFD sequence number")
431                 else:
432                     self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
433                                        "BFD sequence number not one of "
434                                        "(%s, 0)" % self.vpp_seq_number)
435             self.vpp_seq_number = recvd_seq_num
436         # last 20 bytes represent the hash - so replace them with the key,
437         # pad the result with zeros and hash the result
438         hash_material = bfd.original[:-20] + self.sha1_key.key + \
439             "\0" * (20 - len(self.sha1_key.key))
440         expected_hash = hashlib.sha1(hash_material).hexdigest()
441         self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
442                                expected_hash, "Auth key hash")
443
444     def verify_bfd(self, packet):
445         """ Verify correctness of BFD layer. """
446         bfd = packet[BFD]
447         self.test.assert_equal(bfd.version, 1, "BFD version")
448         self.test.assert_equal(bfd.your_discriminator,
449                                self.my_discriminator,
450                                "BFD - your discriminator")
451         if self.sha1_key:
452             self.verify_sha1_auth(packet)
453
454
455 def bfd_session_up(test):
456     """ Bring BFD session up """
457     test.logger.info("BFD: Waiting for slow hello")
458     p = wait_for_bfd_packet(test, 2)
459     old_offset = None
460     if hasattr(test, 'vpp_clock_offset'):
461         old_offset = test.vpp_clock_offset
462     test.vpp_clock_offset = time.time() - p.time
463     test.logger.debug("BFD: Calculated vpp clock offset: %s",
464                       test.vpp_clock_offset)
465     if old_offset:
466         test.assertAlmostEqual(
467             old_offset, test.vpp_clock_offset, delta=0.5,
468             msg="vpp clock offset not stable (new: %s, old: %s)" %
469             (test.vpp_clock_offset, old_offset))
470     test.logger.info("BFD: Sending Init")
471     test.test_session.update(my_discriminator=randint(0, 40000000),
472                              your_discriminator=p[BFD].my_discriminator,
473                              state=BFDState.init)
474     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
475             BFDAuthType.meticulous_keyed_sha1:
476         test.test_session.inc_seq_num()
477     test.test_session.send_packet()
478     test.logger.info("BFD: Waiting for event")
479     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
480     verify_event(test, e, expected_state=BFDState.up)
481     test.logger.info("BFD: Session is Up")
482     test.test_session.update(state=BFDState.up)
483     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
484             BFDAuthType.meticulous_keyed_sha1:
485         test.test_session.inc_seq_num()
486     test.test_session.send_packet()
487     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
488
489
490 def bfd_session_down(test):
491     """ Bring BFD session down """
492     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
493     test.test_session.update(state=BFDState.down)
494     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
495             BFDAuthType.meticulous_keyed_sha1:
496         test.test_session.inc_seq_num()
497     test.test_session.send_packet()
498     test.logger.info("BFD: Waiting for event")
499     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
500     verify_event(test, e, expected_state=BFDState.down)
501     test.logger.info("BFD: Session is Down")
502     test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
503
504
505 def verify_bfd_session_config(test, session, state=None):
506     dump = session.get_bfd_udp_session_dump_entry()
507     test.assertIsNotNone(dump)
508     # since dump is not none, we have verified that sw_if_index and addresses
509     # are valid (in get_bfd_udp_session_dump_entry)
510     if state:
511         test.assert_equal(dump.state, state, "session state")
512     test.assert_equal(dump.required_min_rx, session.required_min_rx,
513                       "required min rx interval")
514     test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
515                       "desired min tx interval")
516     test.assert_equal(dump.detect_mult, session.detect_mult,
517                       "detect multiplier")
518     if session.sha1_key is None:
519         test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
520     else:
521         test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
522         test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
523                           "bfd key id")
524         test.assert_equal(dump.conf_key_id,
525                           session.sha1_key.conf_key_id,
526                           "config key id")
527
528
529 def verify_ip(test, packet):
530     """ Verify correctness of IP layer. """
531     if test.vpp_session.af == AF_INET6:
532         ip = packet[IPv6]
533         local_ip = test.pg0.local_ip6
534         remote_ip = test.pg0.remote_ip6
535         test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
536     else:
537         ip = packet[IP]
538         local_ip = test.pg0.local_ip4
539         remote_ip = test.pg0.remote_ip4
540         test.assert_equal(ip.ttl, 255, "IPv4 TTL")
541     test.assert_equal(ip.src, local_ip, "IP source address")
542     test.assert_equal(ip.dst, remote_ip, "IP destination address")
543
544
545 def verify_udp(test, packet):
546     """ Verify correctness of UDP layer. """
547     udp = packet[UDP]
548     test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
549     test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
550                          "UDP source port")
551
552
553 def verify_event(test, event, expected_state):
554     """ Verify correctness of event values. """
555     e = event
556     test.logger.debug("BFD: Event: %s" % repr(e))
557     test.assert_equal(e.sw_if_index,
558                       test.vpp_session.interface.sw_if_index,
559                       "BFD interface index")
560     is_ipv6 = 0
561     if test.vpp_session.af == AF_INET6:
562         is_ipv6 = 1
563     test.assert_equal(e.is_ipv6, is_ipv6, "is_ipv6")
564     if test.vpp_session.af == AF_INET:
565         test.assert_equal(e.local_addr[:4], test.vpp_session.local_addr_n,
566                           "Local IPv4 address")
567         test.assert_equal(e.peer_addr[:4], test.vpp_session.peer_addr_n,
568                           "Peer IPv4 address")
569     else:
570         test.assert_equal(e.local_addr, test.vpp_session.local_addr_n,
571                           "Local IPv6 address")
572         test.assert_equal(e.peer_addr, test.vpp_session.peer_addr_n,
573                           "Peer IPv6 address")
574     test.assert_equal(e.state, expected_state, BFDState)
575
576
577 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None):
578     """ wait for BFD packet and verify its correctness
579
580     :param timeout: how long to wait
581     :param pcap_time_min: ignore packets with pcap timestamp lower than this
582
583     :returns: tuple (packet, time spent waiting for packet)
584     """
585     test.logger.info("BFD: Waiting for BFD packet")
586     deadline = time.time() + timeout
587     counter = 0
588     while True:
589         counter += 1
590         # sanity check
591         test.assert_in_range(counter, 0, 100, "number of packets ignored")
592         time_left = deadline - time.time()
593         if time_left < 0:
594             raise CaptureTimeoutError("Packet did not arrive within timeout")
595         p = test.pg0.wait_for_packet(timeout=time_left)
596         test.logger.debug(ppp("BFD: Got packet:", p))
597         if pcap_time_min is not None and p.time < pcap_time_min:
598             test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
599                                   "pcap time min %s):" %
600                                   (p.time, pcap_time_min), p))
601         else:
602             break
603     bfd = p[BFD]
604     if bfd is None:
605         raise Exception(ppp("Unexpected or invalid BFD packet:", p))
606     if bfd.payload:
607         raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
608     verify_ip(test, p)
609     verify_udp(test, p)
610     test.test_session.verify_bfd(p)
611     return p
612
613
614 class BFD4TestCase(VppTestCase):
615     """Bidirectional Forwarding Detection (BFD)"""
616
617     pg0 = None
618     vpp_clock_offset = None
619     vpp_session = None
620     test_session = None
621
622     @classmethod
623     def setUpClass(cls):
624         super(BFD4TestCase, cls).setUpClass()
625         try:
626             cls.create_pg_interfaces([0])
627             cls.create_loopback_interfaces([0])
628             cls.loopback0 = cls.lo_interfaces[0]
629             cls.loopback0.config_ip4()
630             cls.loopback0.admin_up()
631             cls.pg0.config_ip4()
632             cls.pg0.configure_ipv4_neighbors()
633             cls.pg0.admin_up()
634             cls.pg0.resolve_arp()
635
636         except Exception:
637             super(BFD4TestCase, cls).tearDownClass()
638             raise
639
640     def setUp(self):
641         super(BFD4TestCase, self).setUp()
642         self.factory = AuthKeyFactory()
643         self.vapi.want_bfd_events()
644         self.pg0.enable_capture()
645         try:
646             self.vpp_session = VppBFDUDPSession(self, self.pg0,
647                                                 self.pg0.remote_ip4)
648             self.vpp_session.add_vpp_config()
649             self.vpp_session.admin_up()
650             self.test_session = BFDTestSession(self, self.pg0, AF_INET)
651         except:
652             self.vapi.want_bfd_events(enable_disable=0)
653             raise
654
655     def tearDown(self):
656         if not self.vpp_dead:
657             self.vapi.want_bfd_events(enable_disable=0)
658         self.vapi.collect_events()  # clear the event queue
659         super(BFD4TestCase, self).tearDown()
660
661     def test_session_up(self):
662         """ bring BFD session up """
663         bfd_session_up(self)
664
665     def test_session_up_by_ip(self):
666         """ bring BFD session up - first frame looked up by address pair """
667         self.logger.info("BFD: Sending Slow control frame")
668         self.test_session.update(my_discriminator=randint(0, 40000000))
669         self.test_session.send_packet()
670         self.pg0.enable_capture()
671         p = self.pg0.wait_for_packet(1)
672         self.assert_equal(p[BFD].your_discriminator,
673                           self.test_session.my_discriminator,
674                           "BFD - your discriminator")
675         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
676         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
677                                  state=BFDState.up)
678         self.logger.info("BFD: Waiting for event")
679         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
680         verify_event(self, e, expected_state=BFDState.init)
681         self.logger.info("BFD: Sending Up")
682         self.test_session.send_packet()
683         self.logger.info("BFD: Waiting for event")
684         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
685         verify_event(self, e, expected_state=BFDState.up)
686         self.logger.info("BFD: Session is Up")
687         self.test_session.update(state=BFDState.up)
688         self.test_session.send_packet()
689         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
690
691     def test_session_down(self):
692         """ bring BFD session down """
693         bfd_session_up(self)
694         bfd_session_down(self)
695
696     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
697     def test_hold_up(self):
698         """ hold BFD session up """
699         bfd_session_up(self)
700         for dummy in range(self.test_session.detect_mult * 2):
701             wait_for_bfd_packet(self)
702             self.test_session.send_packet()
703         self.assert_equal(len(self.vapi.collect_events()), 0,
704                           "number of bfd events")
705
706     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
707     def test_slow_timer(self):
708         """ verify slow periodic control frames while session down """
709         packet_count = 3
710         self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
711         prev_packet = wait_for_bfd_packet(self, 2)
712         for dummy in range(packet_count):
713             next_packet = wait_for_bfd_packet(self, 2)
714             time_diff = next_packet.time - prev_packet.time
715             # spec says the range should be <0.75, 1>, allow extra 0.05 margin
716             # to work around timing issues
717             self.assert_in_range(
718                 time_diff, 0.70, 1.05, "time between slow packets")
719             prev_packet = next_packet
720
721     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
722     def test_zero_remote_min_rx(self):
723         """ no packets when zero remote required min rx interval """
724         bfd_session_up(self)
725         self.test_session.update(required_min_rx=0)
726         self.test_session.send_packet()
727         for dummy in range(self.test_session.detect_mult):
728             self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
729                        "sleep before transmitting bfd packet")
730             self.test_session.send_packet()
731             try:
732                 p = wait_for_bfd_packet(self, timeout=0)
733                 self.logger.error(ppp("Received unexpected packet:", p))
734             except CaptureTimeoutError:
735                 pass
736         self.assert_equal(
737             len(self.vapi.collect_events()), 0, "number of bfd events")
738         self.test_session.update(required_min_rx=100000)
739         for dummy in range(3):
740             self.test_session.send_packet()
741             wait_for_bfd_packet(
742                 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
743         self.assert_equal(
744             len(self.vapi.collect_events()), 0, "number of bfd events")
745
746     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
747     def test_conn_down(self):
748         """ verify session goes down after inactivity """
749         bfd_session_up(self)
750         detection_time = self.test_session.detect_mult *\
751             self.vpp_session.required_min_rx / USEC_IN_SEC
752         self.sleep(detection_time, "waiting for BFD session time-out")
753         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
754         verify_event(self, e, expected_state=BFDState.down)
755
756     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
757     def test_large_required_min_rx(self):
758         """ large remote required min rx interval """
759         bfd_session_up(self)
760         p = wait_for_bfd_packet(self)
761         interval = 3000000
762         self.test_session.update(required_min_rx=interval)
763         self.test_session.send_packet()
764         time_mark = time.time()
765         count = 0
766         # busy wait here, trying to collect a packet or event, vpp is not
767         # allowed to send packets and the session will timeout first - so the
768         # Up->Down event must arrive before any packets do
769         while time.time() < time_mark + interval / USEC_IN_SEC:
770             try:
771                 p = wait_for_bfd_packet(self, timeout=0)
772                 # if vpp managed to send a packet before we did the session
773                 # session update, then that's fine, ignore it
774                 if p.time < time_mark - self.vpp_clock_offset:
775                     continue
776                 self.logger.error(ppp("Received unexpected packet:", p))
777                 count += 1
778             except CaptureTimeoutError:
779                 pass
780             events = self.vapi.collect_events()
781             if len(events) > 0:
782                 verify_event(self, events[0], BFDState.down)
783                 break
784         self.assert_equal(count, 0, "number of packets received")
785
786     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
787     def test_immediate_remote_min_rx_reduction(self):
788         """ immediately honor remote required min rx reduction """
789         self.vpp_session.remove_vpp_config()
790         self.vpp_session = VppBFDUDPSession(
791             self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
792         self.pg0.enable_capture()
793         self.vpp_session.add_vpp_config()
794         self.test_session.update(desired_min_tx=1000000,
795                                  required_min_rx=1000000)
796         bfd_session_up(self)
797         reference_packet = wait_for_bfd_packet(self)
798         time_mark = time.time()
799         interval = 300000
800         self.test_session.update(required_min_rx=interval)
801         self.test_session.send_packet()
802         extra_time = time.time() - time_mark
803         p = wait_for_bfd_packet(self)
804         # first packet is allowed to be late by time we spent doing the update
805         # calculated in extra_time
806         self.assert_in_range(p.time - reference_packet.time,
807                              .95 * 0.75 * interval / USEC_IN_SEC,
808                              1.05 * interval / USEC_IN_SEC + extra_time,
809                              "time between BFD packets")
810         reference_packet = p
811         for dummy in range(3):
812             p = wait_for_bfd_packet(self)
813             diff = p.time - reference_packet.time
814             self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
815                                  1.05 * interval / USEC_IN_SEC,
816                                  "time between BFD packets")
817             reference_packet = p
818
819     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
820     def test_modify_req_min_rx_double(self):
821         """ modify session - double required min rx """
822         bfd_session_up(self)
823         p = wait_for_bfd_packet(self)
824         self.test_session.update(desired_min_tx=10000,
825                                  required_min_rx=10000)
826         self.test_session.send_packet()
827         # double required min rx
828         self.vpp_session.modify_parameters(
829             required_min_rx=2 * self.vpp_session.required_min_rx)
830         p = wait_for_bfd_packet(
831             self, pcap_time_min=time.time() - self.vpp_clock_offset)
832         # poll bit needs to be set
833         self.assertIn("P", p.sprintf("%BFD.flags%"),
834                       "Poll bit not set in BFD packet")
835         # finish poll sequence with final packet
836         final = self.test_session.create_packet()
837         final[BFD].flags = "F"
838         timeout = self.test_session.detect_mult * \
839             max(self.test_session.desired_min_tx,
840                 self.vpp_session.required_min_rx) / USEC_IN_SEC
841         self.test_session.send_packet(final)
842         time_mark = time.time()
843         e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
844         verify_event(self, e, expected_state=BFDState.down)
845         time_to_event = time.time() - time_mark
846         self.assert_in_range(time_to_event, .9 * timeout,
847                              1.1 * timeout, "session timeout")
848
849     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
850     def test_modify_req_min_rx_halve(self):
851         """ modify session - halve required min rx """
852         self.vpp_session.modify_parameters(
853             required_min_rx=2 * self.vpp_session.required_min_rx)
854         bfd_session_up(self)
855         p = wait_for_bfd_packet(self)
856         self.test_session.update(desired_min_tx=10000,
857                                  required_min_rx=10000)
858         self.test_session.send_packet()
859         p = wait_for_bfd_packet(
860             self, pcap_time_min=time.time() - self.vpp_clock_offset)
861         # halve required min rx
862         old_required_min_rx = self.vpp_session.required_min_rx
863         self.vpp_session.modify_parameters(
864             required_min_rx=0.5 * self.vpp_session.required_min_rx)
865         # now we wait 0.8*3*old-req-min-rx and the session should still be up
866         self.sleep(0.8 * self.vpp_session.detect_mult *
867                    old_required_min_rx / USEC_IN_SEC)
868         self.assert_equal(len(self.vapi.collect_events()), 0,
869                           "number of bfd events")
870         p = wait_for_bfd_packet(self)
871         # poll bit needs to be set
872         self.assertIn("P", p.sprintf("%BFD.flags%"),
873                       "Poll bit not set in BFD packet")
874         # finish poll sequence with final packet
875         final = self.test_session.create_packet()
876         final[BFD].flags = "F"
877         self.test_session.send_packet(final)
878         # now the session should time out under new conditions
879         before = time.time()
880         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
881         after = time.time()
882         detection_time = self.test_session.detect_mult *\
883             self.vpp_session.required_min_rx / USEC_IN_SEC
884         self.assert_in_range(after - before,
885                              0.9 * detection_time,
886                              1.1 * detection_time,
887                              "time before bfd session goes down")
888         verify_event(self, e, expected_state=BFDState.down)
889
890     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
891     def test_modify_detect_mult(self):
892         """ modify detect multiplier """
893         bfd_session_up(self)
894         p = wait_for_bfd_packet(self)
895         self.vpp_session.modify_parameters(detect_mult=1)
896         p = wait_for_bfd_packet(
897             self, pcap_time_min=time.time() - self.vpp_clock_offset)
898         self.assert_equal(self.vpp_session.detect_mult,
899                           p[BFD].detect_mult,
900                           "detect mult")
901         # poll bit must not be set
902         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
903                          "Poll bit not set in BFD packet")
904         self.vpp_session.modify_parameters(detect_mult=10)
905         p = wait_for_bfd_packet(
906             self, pcap_time_min=time.time() - self.vpp_clock_offset)
907         self.assert_equal(self.vpp_session.detect_mult,
908                           p[BFD].detect_mult,
909                           "detect mult")
910         # poll bit must not be set
911         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
912                          "Poll bit not set in BFD packet")
913
914     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
915     def test_queued_poll(self):
916         """ test poll sequence queueing """
917         bfd_session_up(self)
918         p = wait_for_bfd_packet(self)
919         self.vpp_session.modify_parameters(
920             required_min_rx=2 * self.vpp_session.required_min_rx)
921         p = wait_for_bfd_packet(self)
922         poll_sequence_start = time.time()
923         poll_sequence_length_min = 0.5
924         send_final_after = time.time() + poll_sequence_length_min
925         # poll bit needs to be set
926         self.assertIn("P", p.sprintf("%BFD.flags%"),
927                       "Poll bit not set in BFD packet")
928         self.assert_equal(p[BFD].required_min_rx_interval,
929                           self.vpp_session.required_min_rx,
930                           "BFD required min rx interval")
931         self.vpp_session.modify_parameters(
932             required_min_rx=2 * self.vpp_session.required_min_rx)
933         # 2nd poll sequence should be queued now
934         # don't send the reply back yet, wait for some time to emulate
935         # longer round-trip time
936         packet_count = 0
937         while time.time() < send_final_after:
938             self.test_session.send_packet()
939             p = wait_for_bfd_packet(self)
940             self.assert_equal(len(self.vapi.collect_events()), 0,
941                               "number of bfd events")
942             self.assert_equal(p[BFD].required_min_rx_interval,
943                               self.vpp_session.required_min_rx,
944                               "BFD required min rx interval")
945             packet_count += 1
946             # poll bit must be set
947             self.assertIn("P", p.sprintf("%BFD.flags%"),
948                           "Poll bit not set in BFD packet")
949         final = self.test_session.create_packet()
950         final[BFD].flags = "F"
951         self.test_session.send_packet(final)
952         # finish 1st with final
953         poll_sequence_length = time.time() - poll_sequence_start
954         # vpp must wait for some time before starting new poll sequence
955         poll_no_2_started = False
956         for dummy in range(2 * packet_count):
957             p = wait_for_bfd_packet(self)
958             self.assert_equal(len(self.vapi.collect_events()), 0,
959                               "number of bfd events")
960             if "P" in p.sprintf("%BFD.flags%"):
961                 poll_no_2_started = True
962                 if time.time() < poll_sequence_start + poll_sequence_length:
963                     raise Exception("VPP started 2nd poll sequence too soon")
964                 final = self.test_session.create_packet()
965                 final[BFD].flags = "F"
966                 self.test_session.send_packet(final)
967                 break
968             else:
969                 self.test_session.send_packet()
970         self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
971         # finish 2nd with final
972         final = self.test_session.create_packet()
973         final[BFD].flags = "F"
974         self.test_session.send_packet(final)
975         p = wait_for_bfd_packet(self)
976         # poll bit must not be set
977         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
978                          "Poll bit set in BFD packet")
979
980     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
981     def test_poll_response(self):
982         """ test correct response to control frame with poll bit set """
983         bfd_session_up(self)
984         poll = self.test_session.create_packet()
985         poll[BFD].flags = "P"
986         self.test_session.send_packet(poll)
987         final = wait_for_bfd_packet(
988             self, pcap_time_min=time.time() - self.vpp_clock_offset)
989         self.assertIn("F", final.sprintf("%BFD.flags%"))
990
991     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
992     def test_no_periodic_if_remote_demand(self):
993         """ no periodic frames outside poll sequence if remote demand set """
994         bfd_session_up(self)
995         demand = self.test_session.create_packet()
996         demand[BFD].flags = "D"
997         self.test_session.send_packet(demand)
998         transmit_time = 0.9 \
999             * max(self.vpp_session.required_min_rx,
1000                   self.test_session.desired_min_tx) \
1001             / USEC_IN_SEC
1002         count = 0
1003         for dummy in range(self.test_session.detect_mult * 2):
1004             time.sleep(transmit_time)
1005             self.test_session.send_packet(demand)
1006             try:
1007                 p = wait_for_bfd_packet(self, timeout=0)
1008                 self.logger.error(ppp("Received unexpected packet:", p))
1009                 count += 1
1010             except CaptureTimeoutError:
1011                 pass
1012         events = self.vapi.collect_events()
1013         for e in events:
1014             self.logger.error("Received unexpected event: %s", e)
1015         self.assert_equal(count, 0, "number of packets received")
1016         self.assert_equal(len(events), 0, "number of events received")
1017
1018     def test_echo_looped_back(self):
1019         """ echo packets looped back """
1020         # don't need a session in this case..
1021         self.vpp_session.remove_vpp_config()
1022         self.pg0.enable_capture()
1023         echo_packet_count = 10
1024         # random source port low enough to increment a few times..
1025         udp_sport_tx = randint(1, 50000)
1026         udp_sport_rx = udp_sport_tx
1027         echo_packet = (Ether(src=self.pg0.remote_mac,
1028                              dst=self.pg0.local_mac) /
1029                        IP(src=self.pg0.remote_ip4,
1030                           dst=self.pg0.remote_ip4) /
1031                        UDP(dport=BFD.udp_dport_echo) /
1032                        Raw("this should be looped back"))
1033         for dummy in range(echo_packet_count):
1034             self.sleep(.01, "delay between echo packets")
1035             echo_packet[UDP].sport = udp_sport_tx
1036             udp_sport_tx += 1
1037             self.logger.debug(ppp("Sending packet:", echo_packet))
1038             self.pg0.add_stream(echo_packet)
1039             self.pg_start()
1040         for dummy in range(echo_packet_count):
1041             p = self.pg0.wait_for_packet(1)
1042             self.logger.debug(ppp("Got packet:", p))
1043             ether = p[Ether]
1044             self.assert_equal(self.pg0.remote_mac,
1045                               ether.dst, "Destination MAC")
1046             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1047             ip = p[IP]
1048             self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1049             self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1050             udp = p[UDP]
1051             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1052                               "UDP destination port")
1053             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1054             udp_sport_rx += 1
1055             # need to compare the hex payload here, otherwise BFD_vpp_echo
1056             # gets in way
1057             self.assertEqual(str(p[UDP].payload),
1058                              str(echo_packet[UDP].payload),
1059                              "Received packet is not the echo packet sent")
1060         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1061                           "ECHO packet identifier for test purposes)")
1062
1063     def test_echo(self):
1064         """ echo function """
1065         bfd_session_up(self)
1066         self.test_session.update(required_min_echo_rx=50000)
1067         self.test_session.send_packet()
1068         detection_time = self.test_session.detect_mult *\
1069             self.vpp_session.required_min_rx / USEC_IN_SEC
1070         # echo shouldn't work without echo source set
1071         for dummy in range(3):
1072             sleep = 0.75 * detection_time
1073             self.sleep(sleep, "delay before sending bfd packet")
1074             self.test_session.send_packet()
1075         p = wait_for_bfd_packet(
1076             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1077         self.assert_equal(p[BFD].required_min_rx_interval,
1078                           self.vpp_session.required_min_rx,
1079                           "BFD required min rx interval")
1080         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1081         # should be turned on - loopback echo packets
1082         for dummy in range(3):
1083             loop_until = time.time() + 0.75 * detection_time
1084             while time.time() < loop_until:
1085                 p = self.pg0.wait_for_packet(1)
1086                 self.logger.debug(ppp("Got packet:", p))
1087                 if p[UDP].dport == BFD.udp_dport_echo:
1088                     self.assert_equal(
1089                         p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1090                     self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1091                                         "BFD ECHO src IP equal to loopback IP")
1092                     self.logger.debug(ppp("Looping back packet:", p))
1093                     self.pg0.add_stream(p)
1094                     self.pg_start()
1095                 elif p.haslayer(BFD):
1096                     self.assertGreaterEqual(p[BFD].required_min_rx_interval,
1097                                             1000000)
1098                     if "P" in p.sprintf("%BFD.flags%"):
1099                         final = self.test_session.create_packet()
1100                         final[BFD].flags = "F"
1101                         self.test_session.send_packet(final)
1102                 else:
1103                     raise Exception(ppp("Received unknown packet:", p))
1104
1105                 self.assert_equal(len(self.vapi.collect_events()), 0,
1106                                   "number of bfd events")
1107             self.test_session.send_packet()
1108
1109     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1110     def test_echo_fail(self):
1111         """ session goes down if echo function fails """
1112         bfd_session_up(self)
1113         self.test_session.update(required_min_echo_rx=50000)
1114         self.test_session.send_packet()
1115         detection_time = self.test_session.detect_mult *\
1116             self.vpp_session.required_min_rx / USEC_IN_SEC
1117         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1118         # echo function should be used now, but we will drop the echo packets
1119         verified_diag = False
1120         for dummy in range(3):
1121             loop_until = time.time() + 0.75 * detection_time
1122             while time.time() < loop_until:
1123                 p = self.pg0.wait_for_packet(1)
1124                 self.logger.debug(ppp("Got packet:", p))
1125                 if p[UDP].dport == BFD.udp_dport_echo:
1126                     # dropped
1127                     pass
1128                 elif p.haslayer(BFD):
1129                     if "P" in p.sprintf("%BFD.flags%"):
1130                         self.assertGreaterEqual(
1131                             p[BFD].required_min_rx_interval,
1132                             1000000)
1133                         final = self.test_session.create_packet()
1134                         final[BFD].flags = "F"
1135                         self.test_session.send_packet(final)
1136                     if p[BFD].state == BFDState.down:
1137                         self.assert_equal(p[BFD].diag,
1138                                           BFDDiagCode.echo_function_failed,
1139                                           BFDDiagCode)
1140                         verified_diag = True
1141                 else:
1142                     raise Exception(ppp("Received unknown packet:", p))
1143             self.test_session.send_packet()
1144         events = self.vapi.collect_events()
1145         self.assert_equal(len(events), 1, "number of bfd events")
1146         self.assert_equal(events[0].state, BFDState.down, BFDState)
1147         self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1148
1149     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1150     def test_echo_stop(self):
1151         """ echo function stops if peer sets required min echo rx zero """
1152         bfd_session_up(self)
1153         self.test_session.update(required_min_echo_rx=50000)
1154         self.test_session.send_packet()
1155         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1156         # wait for first echo packet
1157         while True:
1158             p = self.pg0.wait_for_packet(1)
1159             self.logger.debug(ppp("Got packet:", p))
1160             if p[UDP].dport == BFD.udp_dport_echo:
1161                 self.logger.debug(ppp("Looping back packet:", p))
1162                 self.pg0.add_stream(p)
1163                 self.pg_start()
1164                 break
1165             elif p.haslayer(BFD):
1166                 # ignore BFD
1167                 pass
1168             else:
1169                 raise Exception(ppp("Received unknown packet:", p))
1170         self.test_session.update(required_min_echo_rx=0)
1171         self.test_session.send_packet()
1172         # echo packets shouldn't arrive anymore
1173         for dummy in range(5):
1174             wait_for_bfd_packet(
1175                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1176             self.test_session.send_packet()
1177             events = self.vapi.collect_events()
1178             self.assert_equal(len(events), 0, "number of bfd events")
1179
1180     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1181     def test_echo_source_removed(self):
1182         """ echo function stops if echo source is removed """
1183         bfd_session_up(self)
1184         self.test_session.update(required_min_echo_rx=50000)
1185         self.test_session.send_packet()
1186         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1187         # wait for first echo packet
1188         while True:
1189             p = self.pg0.wait_for_packet(1)
1190             self.logger.debug(ppp("Got packet:", p))
1191             if p[UDP].dport == BFD.udp_dport_echo:
1192                 self.logger.debug(ppp("Looping back packet:", p))
1193                 self.pg0.add_stream(p)
1194                 self.pg_start()
1195                 break
1196             elif p.haslayer(BFD):
1197                 # ignore BFD
1198                 pass
1199             else:
1200                 raise Exception(ppp("Received unknown packet:", p))
1201         self.vapi.bfd_udp_del_echo_source()
1202         self.test_session.send_packet()
1203         # echo packets shouldn't arrive anymore
1204         for dummy in range(5):
1205             wait_for_bfd_packet(
1206                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1207             self.test_session.send_packet()
1208             events = self.vapi.collect_events()
1209             self.assert_equal(len(events), 0, "number of bfd events")
1210
1211     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1212     def test_stale_echo(self):
1213         """ stale echo packets don't keep a session up """
1214         bfd_session_up(self)
1215         self.test_session.update(required_min_echo_rx=50000)
1216         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1217         self.test_session.send_packet()
1218         # should be turned on - loopback echo packets
1219         echo_packet = None
1220         timeout_at = None
1221         timeout_ok = False
1222         for dummy in range(10 * self.vpp_session.detect_mult):
1223             p = self.pg0.wait_for_packet(1)
1224             if p[UDP].dport == BFD.udp_dport_echo:
1225                 if echo_packet is None:
1226                     self.logger.debug(ppp("Got first echo packet:", p))
1227                     echo_packet = p
1228                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1229                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1230                 else:
1231                     self.logger.debug(ppp("Got followup echo packet:", p))
1232                 self.logger.debug(ppp("Looping back first echo packet:", p))
1233                 self.pg0.add_stream(echo_packet)
1234                 self.pg_start()
1235             elif p.haslayer(BFD):
1236                 self.logger.debug(ppp("Got packet:", p))
1237                 if "P" in p.sprintf("%BFD.flags%"):
1238                     final = self.test_session.create_packet()
1239                     final[BFD].flags = "F"
1240                     self.test_session.send_packet(final)
1241                 if p[BFD].state == BFDState.down:
1242                     self.assertIsNotNone(
1243                         timeout_at,
1244                         "Session went down before first echo packet received")
1245                     now = time.time()
1246                     self.assertGreaterEqual(
1247                         now, timeout_at,
1248                         "Session timeout at %s, but is expected at %s" %
1249                         (now, timeout_at))
1250                     self.assert_equal(p[BFD].diag,
1251                                       BFDDiagCode.echo_function_failed,
1252                                       BFDDiagCode)
1253                     events = self.vapi.collect_events()
1254                     self.assert_equal(len(events), 1, "number of bfd events")
1255                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1256                     timeout_ok = True
1257                     break
1258             else:
1259                 raise Exception(ppp("Received unknown packet:", p))
1260             self.test_session.send_packet()
1261         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1262
1263     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1264     def test_invalid_echo_checksum(self):
1265         """ echo packets with invalid checksum don't keep a session up """
1266         bfd_session_up(self)
1267         self.test_session.update(required_min_echo_rx=50000)
1268         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1269         self.test_session.send_packet()
1270         # should be turned on - loopback echo packets
1271         timeout_at = None
1272         timeout_ok = False
1273         for dummy in range(10 * self.vpp_session.detect_mult):
1274             p = self.pg0.wait_for_packet(1)
1275             if p[UDP].dport == BFD.udp_dport_echo:
1276                 self.logger.debug(ppp("Got echo packet:", p))
1277                 if timeout_at is None:
1278                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1279                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1280                 p[BFD_vpp_echo].checksum = getrandbits(64)
1281                 self.logger.debug(ppp("Looping back modified echo packet:", p))
1282                 self.pg0.add_stream(p)
1283                 self.pg_start()
1284             elif p.haslayer(BFD):
1285                 self.logger.debug(ppp("Got packet:", p))
1286                 if "P" in p.sprintf("%BFD.flags%"):
1287                     final = self.test_session.create_packet()
1288                     final[BFD].flags = "F"
1289                     self.test_session.send_packet(final)
1290                 if p[BFD].state == BFDState.down:
1291                     self.assertIsNotNone(
1292                         timeout_at,
1293                         "Session went down before first echo packet received")
1294                     now = time.time()
1295                     self.assertGreaterEqual(
1296                         now, timeout_at,
1297                         "Session timeout at %s, but is expected at %s" %
1298                         (now, timeout_at))
1299                     self.assert_equal(p[BFD].diag,
1300                                       BFDDiagCode.echo_function_failed,
1301                                       BFDDiagCode)
1302                     events = self.vapi.collect_events()
1303                     self.assert_equal(len(events), 1, "number of bfd events")
1304                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1305                     timeout_ok = True
1306                     break
1307             else:
1308                 raise Exception(ppp("Received unknown packet:", p))
1309             self.test_session.send_packet()
1310         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1311
1312     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1313     def test_admin_up_down(self):
1314         """ put session admin-up and admin-down """
1315         bfd_session_up(self)
1316         self.vpp_session.admin_down()
1317         self.pg0.enable_capture()
1318         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1319         verify_event(self, e, expected_state=BFDState.admin_down)
1320         for dummy in range(2):
1321             p = wait_for_bfd_packet(self)
1322             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1323         # try to bring session up - shouldn't be possible
1324         self.test_session.update(state=BFDState.init)
1325         self.test_session.send_packet()
1326         for dummy in range(2):
1327             p = wait_for_bfd_packet(self)
1328             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1329         self.vpp_session.admin_up()
1330         self.test_session.update(state=BFDState.down)
1331         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1332         verify_event(self, e, expected_state=BFDState.down)
1333         p = wait_for_bfd_packet(
1334             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1335         self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1336         self.test_session.send_packet()
1337         p = wait_for_bfd_packet(
1338             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1339         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1340         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1341         verify_event(self, e, expected_state=BFDState.init)
1342         self.test_session.update(state=BFDState.up)
1343         self.test_session.send_packet()
1344         p = wait_for_bfd_packet(
1345             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1346         self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1347         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1348         verify_event(self, e, expected_state=BFDState.up)
1349
1350     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1351     def test_config_change_remote_demand(self):
1352         """ configuration change while peer in demand mode """
1353         bfd_session_up(self)
1354         demand = self.test_session.create_packet()
1355         demand[BFD].flags = "D"
1356         self.test_session.send_packet(demand)
1357         self.vpp_session.modify_parameters(
1358             required_min_rx=2 * self.vpp_session.required_min_rx)
1359         p = wait_for_bfd_packet(
1360             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1361         # poll bit must be set
1362         self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1363         # terminate poll sequence
1364         final = self.test_session.create_packet()
1365         final[BFD].flags = "D+F"
1366         self.test_session.send_packet(final)
1367         # vpp should be quiet now again
1368         transmit_time = 0.9 \
1369             * max(self.vpp_session.required_min_rx,
1370                   self.test_session.desired_min_tx) \
1371             / USEC_IN_SEC
1372         count = 0
1373         for dummy in range(self.test_session.detect_mult * 2):
1374             time.sleep(transmit_time)
1375             self.test_session.send_packet(demand)
1376             try:
1377                 p = wait_for_bfd_packet(self, timeout=0)
1378                 self.logger.error(ppp("Received unexpected packet:", p))
1379                 count += 1
1380             except CaptureTimeoutError:
1381                 pass
1382         events = self.vapi.collect_events()
1383         for e in events:
1384             self.logger.error("Received unexpected event: %s", e)
1385         self.assert_equal(count, 0, "number of packets received")
1386         self.assert_equal(len(events), 0, "number of events received")
1387
1388
1389 class BFD6TestCase(VppTestCase):
1390     """Bidirectional Forwarding Detection (BFD) (IPv6) """
1391
1392     pg0 = None
1393     vpp_clock_offset = None
1394     vpp_session = None
1395     test_session = None
1396
1397     @classmethod
1398     def setUpClass(cls):
1399         super(BFD6TestCase, cls).setUpClass()
1400         try:
1401             cls.create_pg_interfaces([0])
1402             cls.pg0.config_ip6()
1403             cls.pg0.configure_ipv6_neighbors()
1404             cls.pg0.admin_up()
1405             cls.pg0.resolve_ndp()
1406             cls.create_loopback_interfaces([0])
1407             cls.loopback0 = cls.lo_interfaces[0]
1408             cls.loopback0.config_ip6()
1409             cls.loopback0.admin_up()
1410
1411         except Exception:
1412             super(BFD6TestCase, cls).tearDownClass()
1413             raise
1414
1415     def setUp(self):
1416         super(BFD6TestCase, self).setUp()
1417         self.factory = AuthKeyFactory()
1418         self.vapi.want_bfd_events()
1419         self.pg0.enable_capture()
1420         try:
1421             self.vpp_session = VppBFDUDPSession(self, self.pg0,
1422                                                 self.pg0.remote_ip6,
1423                                                 af=AF_INET6)
1424             self.vpp_session.add_vpp_config()
1425             self.vpp_session.admin_up()
1426             self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1427             self.logger.debug(self.vapi.cli("show adj nbr"))
1428         except:
1429             self.vapi.want_bfd_events(enable_disable=0)
1430             raise
1431
1432     def tearDown(self):
1433         if not self.vpp_dead:
1434             self.vapi.want_bfd_events(enable_disable=0)
1435         self.vapi.collect_events()  # clear the event queue
1436         super(BFD6TestCase, self).tearDown()
1437
1438     def test_session_up(self):
1439         """ bring BFD session up """
1440         bfd_session_up(self)
1441
1442     def test_session_up_by_ip(self):
1443         """ bring BFD session up - first frame looked up by address pair """
1444         self.logger.info("BFD: Sending Slow control frame")
1445         self.test_session.update(my_discriminator=randint(0, 40000000))
1446         self.test_session.send_packet()
1447         self.pg0.enable_capture()
1448         p = self.pg0.wait_for_packet(1)
1449         self.assert_equal(p[BFD].your_discriminator,
1450                           self.test_session.my_discriminator,
1451                           "BFD - your discriminator")
1452         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1453         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1454                                  state=BFDState.up)
1455         self.logger.info("BFD: Waiting for event")
1456         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1457         verify_event(self, e, expected_state=BFDState.init)
1458         self.logger.info("BFD: Sending Up")
1459         self.test_session.send_packet()
1460         self.logger.info("BFD: Waiting for event")
1461         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1462         verify_event(self, e, expected_state=BFDState.up)
1463         self.logger.info("BFD: Session is Up")
1464         self.test_session.update(state=BFDState.up)
1465         self.test_session.send_packet()
1466         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1467
1468     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1469     def test_hold_up(self):
1470         """ hold BFD session up """
1471         bfd_session_up(self)
1472         for dummy in range(self.test_session.detect_mult * 2):
1473             wait_for_bfd_packet(self)
1474             self.test_session.send_packet()
1475         self.assert_equal(len(self.vapi.collect_events()), 0,
1476                           "number of bfd events")
1477         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1478
1479     def test_echo_looped_back(self):
1480         """ echo packets looped back """
1481         # don't need a session in this case..
1482         self.vpp_session.remove_vpp_config()
1483         self.pg0.enable_capture()
1484         echo_packet_count = 10
1485         # random source port low enough to increment a few times..
1486         udp_sport_tx = randint(1, 50000)
1487         udp_sport_rx = udp_sport_tx
1488         echo_packet = (Ether(src=self.pg0.remote_mac,
1489                              dst=self.pg0.local_mac) /
1490                        IPv6(src=self.pg0.remote_ip6,
1491                             dst=self.pg0.remote_ip6) /
1492                        UDP(dport=BFD.udp_dport_echo) /
1493                        Raw("this should be looped back"))
1494         for dummy in range(echo_packet_count):
1495             self.sleep(.01, "delay between echo packets")
1496             echo_packet[UDP].sport = udp_sport_tx
1497             udp_sport_tx += 1
1498             self.logger.debug(ppp("Sending packet:", echo_packet))
1499             self.pg0.add_stream(echo_packet)
1500             self.pg_start()
1501         for dummy in range(echo_packet_count):
1502             p = self.pg0.wait_for_packet(1)
1503             self.logger.debug(ppp("Got packet:", p))
1504             ether = p[Ether]
1505             self.assert_equal(self.pg0.remote_mac,
1506                               ether.dst, "Destination MAC")
1507             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1508             ip = p[IPv6]
1509             self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1510             self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1511             udp = p[UDP]
1512             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1513                               "UDP destination port")
1514             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1515             udp_sport_rx += 1
1516             # need to compare the hex payload here, otherwise BFD_vpp_echo
1517             # gets in way
1518             self.assertEqual(str(p[UDP].payload),
1519                              str(echo_packet[UDP].payload),
1520                              "Received packet is not the echo packet sent")
1521         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1522                           "ECHO packet identifier for test purposes)")
1523         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1524                           "ECHO packet identifier for test purposes)")
1525
1526     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1527     def test_echo(self):
1528         """ echo function used """
1529         bfd_session_up(self)
1530         self.test_session.update(required_min_echo_rx=50000)
1531         self.test_session.send_packet()
1532         detection_time = self.test_session.detect_mult *\
1533             self.vpp_session.required_min_rx / USEC_IN_SEC
1534         # echo shouldn't work without echo source set
1535         for dummy in range(3):
1536             sleep = 0.75 * detection_time
1537             self.sleep(sleep, "delay before sending bfd packet")
1538             self.test_session.send_packet()
1539         p = wait_for_bfd_packet(
1540             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1541         self.assert_equal(p[BFD].required_min_rx_interval,
1542                           self.vpp_session.required_min_rx,
1543                           "BFD required min rx interval")
1544         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1545         # should be turned on - loopback echo packets
1546         for dummy in range(3):
1547             loop_until = time.time() + 0.75 * detection_time
1548             while time.time() < loop_until:
1549                 p = self.pg0.wait_for_packet(1)
1550                 self.logger.debug(ppp("Got packet:", p))
1551                 if p[UDP].dport == BFD.udp_dport_echo:
1552                     self.assert_equal(
1553                         p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1554                     self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1555                                         "BFD ECHO src IP equal to loopback IP")
1556                     self.logger.debug(ppp("Looping back packet:", p))
1557                     self.pg0.add_stream(p)
1558                     self.pg_start()
1559                 elif p.haslayer(BFD):
1560                     self.assertGreaterEqual(p[BFD].required_min_rx_interval,
1561                                             1000000)
1562                     if "P" in p.sprintf("%BFD.flags%"):
1563                         final = self.test_session.create_packet()
1564                         final[BFD].flags = "F"
1565                         self.test_session.send_packet(final)
1566                 else:
1567                     raise Exception(ppp("Received unknown packet:", p))
1568
1569                 self.assert_equal(len(self.vapi.collect_events()), 0,
1570                                   "number of bfd events")
1571             self.test_session.send_packet()
1572
1573
1574 class BFDSHA1TestCase(VppTestCase):
1575     """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1576
1577     pg0 = None
1578     vpp_clock_offset = None
1579     vpp_session = None
1580     test_session = None
1581
1582     @classmethod
1583     def setUpClass(cls):
1584         super(BFDSHA1TestCase, cls).setUpClass()
1585         try:
1586             cls.create_pg_interfaces([0])
1587             cls.pg0.config_ip4()
1588             cls.pg0.admin_up()
1589             cls.pg0.resolve_arp()
1590
1591         except Exception:
1592             super(BFDSHA1TestCase, cls).tearDownClass()
1593             raise
1594
1595     def setUp(self):
1596         super(BFDSHA1TestCase, self).setUp()
1597         self.factory = AuthKeyFactory()
1598         self.vapi.want_bfd_events()
1599         self.pg0.enable_capture()
1600
1601     def tearDown(self):
1602         if not self.vpp_dead:
1603             self.vapi.want_bfd_events(enable_disable=0)
1604         self.vapi.collect_events()  # clear the event queue
1605         super(BFDSHA1TestCase, self).tearDown()
1606
1607     def test_session_up(self):
1608         """ bring BFD session up """
1609         key = self.factory.create_random_key(self)
1610         key.add_vpp_config()
1611         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1612                                             self.pg0.remote_ip4,
1613                                             sha1_key=key)
1614         self.vpp_session.add_vpp_config()
1615         self.vpp_session.admin_up()
1616         self.test_session = BFDTestSession(
1617             self, self.pg0, AF_INET, sha1_key=key,
1618             bfd_key_id=self.vpp_session.bfd_key_id)
1619         bfd_session_up(self)
1620
1621     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1622     def test_hold_up(self):
1623         """ hold BFD session up """
1624         key = self.factory.create_random_key(self)
1625         key.add_vpp_config()
1626         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1627                                             self.pg0.remote_ip4,
1628                                             sha1_key=key)
1629         self.vpp_session.add_vpp_config()
1630         self.vpp_session.admin_up()
1631         self.test_session = BFDTestSession(
1632             self, self.pg0, AF_INET, sha1_key=key,
1633             bfd_key_id=self.vpp_session.bfd_key_id)
1634         bfd_session_up(self)
1635         for dummy in range(self.test_session.detect_mult * 2):
1636             wait_for_bfd_packet(self)
1637             self.test_session.send_packet()
1638         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1639
1640     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1641     def test_hold_up_meticulous(self):
1642         """ hold BFD session up - meticulous auth """
1643         key = self.factory.create_random_key(
1644             self, BFDAuthType.meticulous_keyed_sha1)
1645         key.add_vpp_config()
1646         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1647                                             self.pg0.remote_ip4, sha1_key=key)
1648         self.vpp_session.add_vpp_config()
1649         self.vpp_session.admin_up()
1650         # specify sequence number so that it wraps
1651         self.test_session = BFDTestSession(
1652             self, self.pg0, AF_INET, sha1_key=key,
1653             bfd_key_id=self.vpp_session.bfd_key_id,
1654             our_seq_number=0xFFFFFFFF - 4)
1655         bfd_session_up(self)
1656         for dummy in range(30):
1657             wait_for_bfd_packet(self)
1658             self.test_session.inc_seq_num()
1659             self.test_session.send_packet()
1660         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1661
1662     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1663     def test_send_bad_seq_number(self):
1664         """ session is not kept alive by msgs with bad sequence numbers"""
1665         key = self.factory.create_random_key(
1666             self, BFDAuthType.meticulous_keyed_sha1)
1667         key.add_vpp_config()
1668         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1669                                             self.pg0.remote_ip4, sha1_key=key)
1670         self.vpp_session.add_vpp_config()
1671         self.test_session = BFDTestSession(
1672             self, self.pg0, AF_INET, sha1_key=key,
1673             bfd_key_id=self.vpp_session.bfd_key_id)
1674         bfd_session_up(self)
1675         detection_time = self.test_session.detect_mult *\
1676             self.vpp_session.required_min_rx / USEC_IN_SEC
1677         send_until = time.time() + 2 * detection_time
1678         while time.time() < send_until:
1679             self.test_session.send_packet()
1680             self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
1681                        "time between bfd packets")
1682         e = self.vapi.collect_events()
1683         # session should be down now, because the sequence numbers weren't
1684         # updated
1685         self.assert_equal(len(e), 1, "number of bfd events")
1686         verify_event(self, e[0], expected_state=BFDState.down)
1687
1688     def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
1689                                        legitimate_test_session,
1690                                        rogue_test_session,
1691                                        rogue_bfd_values=None):
1692         """ execute a rogue session interaction scenario
1693
1694         1. create vpp session, add config
1695         2. bring the legitimate session up
1696         3. copy the bfd values from legitimate session to rogue session
1697         4. apply rogue_bfd_values to rogue session
1698         5. set rogue session state to down
1699         6. send message to take the session down from the rogue session
1700         7. assert that the legitimate session is unaffected
1701         """
1702
1703         self.vpp_session = vpp_bfd_udp_session
1704         self.vpp_session.add_vpp_config()
1705         self.test_session = legitimate_test_session
1706         # bring vpp session up
1707         bfd_session_up(self)
1708         # send packet from rogue session
1709         rogue_test_session.update(
1710             my_discriminator=self.test_session.my_discriminator,
1711             your_discriminator=self.test_session.your_discriminator,
1712             desired_min_tx=self.test_session.desired_min_tx,
1713             required_min_rx=self.test_session.required_min_rx,
1714             detect_mult=self.test_session.detect_mult,
1715             diag=self.test_session.diag,
1716             state=self.test_session.state,
1717             auth_type=self.test_session.auth_type)
1718         if rogue_bfd_values:
1719             rogue_test_session.update(**rogue_bfd_values)
1720         rogue_test_session.update(state=BFDState.down)
1721         rogue_test_session.send_packet()
1722         wait_for_bfd_packet(self)
1723         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1724
1725     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1726     def test_mismatch_auth(self):
1727         """ session is not brought down by unauthenticated msg """
1728         key = self.factory.create_random_key(self)
1729         key.add_vpp_config()
1730         vpp_session = VppBFDUDPSession(
1731             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1732         legitimate_test_session = BFDTestSession(
1733             self, self.pg0, AF_INET, sha1_key=key,
1734             bfd_key_id=vpp_session.bfd_key_id)
1735         rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
1736         self.execute_rogue_session_scenario(vpp_session,
1737                                             legitimate_test_session,
1738                                             rogue_test_session)
1739
1740     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1741     def test_mismatch_bfd_key_id(self):
1742         """ session is not brought down by msg with non-existent key-id """
1743         key = self.factory.create_random_key(self)
1744         key.add_vpp_config()
1745         vpp_session = VppBFDUDPSession(
1746             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1747         # pick a different random bfd key id
1748         x = randint(0, 255)
1749         while x == vpp_session.bfd_key_id:
1750             x = randint(0, 255)
1751         legitimate_test_session = BFDTestSession(
1752             self, self.pg0, AF_INET, sha1_key=key,
1753             bfd_key_id=vpp_session.bfd_key_id)
1754         rogue_test_session = BFDTestSession(
1755             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
1756         self.execute_rogue_session_scenario(vpp_session,
1757                                             legitimate_test_session,
1758                                             rogue_test_session)
1759
1760     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1761     def test_mismatched_auth_type(self):
1762         """ session is not brought down by msg with wrong auth type """
1763         key = self.factory.create_random_key(self)
1764         key.add_vpp_config()
1765         vpp_session = VppBFDUDPSession(
1766             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1767         legitimate_test_session = BFDTestSession(
1768             self, self.pg0, AF_INET, sha1_key=key,
1769             bfd_key_id=vpp_session.bfd_key_id)
1770         rogue_test_session = BFDTestSession(
1771             self, self.pg0, AF_INET, sha1_key=key,
1772             bfd_key_id=vpp_session.bfd_key_id)
1773         self.execute_rogue_session_scenario(
1774             vpp_session, legitimate_test_session, rogue_test_session,
1775             {'auth_type': BFDAuthType.keyed_md5})
1776
1777     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1778     def test_restart(self):
1779         """ simulate remote peer restart and resynchronization """
1780         key = self.factory.create_random_key(
1781             self, BFDAuthType.meticulous_keyed_sha1)
1782         key.add_vpp_config()
1783         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1784                                             self.pg0.remote_ip4, sha1_key=key)
1785         self.vpp_session.add_vpp_config()
1786         self.test_session = BFDTestSession(
1787             self, self.pg0, AF_INET, sha1_key=key,
1788             bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
1789         bfd_session_up(self)
1790         # don't send any packets for 2*detection_time
1791         detection_time = self.test_session.detect_mult *\
1792             self.vpp_session.required_min_rx / USEC_IN_SEC
1793         self.sleep(2 * detection_time, "simulating peer restart")
1794         events = self.vapi.collect_events()
1795         self.assert_equal(len(events), 1, "number of bfd events")
1796         verify_event(self, events[0], expected_state=BFDState.down)
1797         self.test_session.update(state=BFDState.down)
1798         # reset sequence number
1799         self.test_session.our_seq_number = 0
1800         self.test_session.vpp_seq_number = None
1801         # now throw away any pending packets
1802         self.pg0.enable_capture()
1803         bfd_session_up(self)
1804
1805
1806 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1807 class BFDAuthOnOffTestCase(VppTestCase):
1808     """Bidirectional Forwarding Detection (BFD) (changing auth) """
1809
1810     pg0 = None
1811     vpp_session = None
1812     test_session = None
1813
1814     @classmethod
1815     def setUpClass(cls):
1816         super(BFDAuthOnOffTestCase, cls).setUpClass()
1817         try:
1818             cls.create_pg_interfaces([0])
1819             cls.pg0.config_ip4()
1820             cls.pg0.admin_up()
1821             cls.pg0.resolve_arp()
1822
1823         except Exception:
1824             super(BFDAuthOnOffTestCase, cls).tearDownClass()
1825             raise
1826
1827     def setUp(self):
1828         super(BFDAuthOnOffTestCase, self).setUp()
1829         self.factory = AuthKeyFactory()
1830         self.vapi.want_bfd_events()
1831         self.pg0.enable_capture()
1832
1833     def tearDown(self):
1834         if not self.vpp_dead:
1835             self.vapi.want_bfd_events(enable_disable=0)
1836         self.vapi.collect_events()  # clear the event queue
1837         super(BFDAuthOnOffTestCase, self).tearDown()
1838
1839     def test_auth_on_immediate(self):
1840         """ turn auth on without disturbing session state (immediate) """
1841         key = self.factory.create_random_key(self)
1842         key.add_vpp_config()
1843         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1844                                             self.pg0.remote_ip4)
1845         self.vpp_session.add_vpp_config()
1846         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
1847         bfd_session_up(self)
1848         for dummy in range(self.test_session.detect_mult * 2):
1849             p = wait_for_bfd_packet(self)
1850             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1851             self.test_session.send_packet()
1852         self.vpp_session.activate_auth(key)
1853         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1854         self.test_session.sha1_key = key
1855         for dummy in range(self.test_session.detect_mult * 2):
1856             p = wait_for_bfd_packet(self)
1857             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1858             self.test_session.send_packet()
1859         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1860         self.assert_equal(len(self.vapi.collect_events()), 0,
1861                           "number of bfd events")
1862
1863     def test_auth_off_immediate(self):
1864         """ turn auth off without disturbing session state (immediate) """
1865         key = self.factory.create_random_key(self)
1866         key.add_vpp_config()
1867         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1868                                             self.pg0.remote_ip4, sha1_key=key)
1869         self.vpp_session.add_vpp_config()
1870         self.test_session = BFDTestSession(
1871             self, self.pg0, AF_INET, sha1_key=key,
1872             bfd_key_id=self.vpp_session.bfd_key_id)
1873         bfd_session_up(self)
1874         # self.vapi.want_bfd_events(enable_disable=0)
1875         for dummy in range(self.test_session.detect_mult * 2):
1876             p = wait_for_bfd_packet(self)
1877             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1878             self.test_session.inc_seq_num()
1879             self.test_session.send_packet()
1880         self.vpp_session.deactivate_auth()
1881         self.test_session.bfd_key_id = None
1882         self.test_session.sha1_key = None
1883         for dummy in range(self.test_session.detect_mult * 2):
1884             p = wait_for_bfd_packet(self)
1885             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1886             self.test_session.inc_seq_num()
1887             self.test_session.send_packet()
1888         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1889         self.assert_equal(len(self.vapi.collect_events()), 0,
1890                           "number of bfd events")
1891
1892     def test_auth_change_key_immediate(self):
1893         """ change auth key without disturbing session state (immediate) """
1894         key1 = self.factory.create_random_key(self)
1895         key1.add_vpp_config()
1896         key2 = self.factory.create_random_key(self)
1897         key2.add_vpp_config()
1898         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1899                                             self.pg0.remote_ip4, sha1_key=key1)
1900         self.vpp_session.add_vpp_config()
1901         self.test_session = BFDTestSession(
1902             self, self.pg0, AF_INET, sha1_key=key1,
1903             bfd_key_id=self.vpp_session.bfd_key_id)
1904         bfd_session_up(self)
1905         for dummy in range(self.test_session.detect_mult * 2):
1906             p = wait_for_bfd_packet(self)
1907             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1908             self.test_session.send_packet()
1909         self.vpp_session.activate_auth(key2)
1910         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1911         self.test_session.sha1_key = key2
1912         for dummy in range(self.test_session.detect_mult * 2):
1913             p = wait_for_bfd_packet(self)
1914             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1915             self.test_session.send_packet()
1916         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1917         self.assert_equal(len(self.vapi.collect_events()), 0,
1918                           "number of bfd events")
1919
1920     def test_auth_on_delayed(self):
1921         """ turn auth on without disturbing session state (delayed) """
1922         key = self.factory.create_random_key(self)
1923         key.add_vpp_config()
1924         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1925                                             self.pg0.remote_ip4)
1926         self.vpp_session.add_vpp_config()
1927         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
1928         bfd_session_up(self)
1929         for dummy in range(self.test_session.detect_mult * 2):
1930             wait_for_bfd_packet(self)
1931             self.test_session.send_packet()
1932         self.vpp_session.activate_auth(key, delayed=True)
1933         for dummy in range(self.test_session.detect_mult * 2):
1934             p = wait_for_bfd_packet(self)
1935             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1936             self.test_session.send_packet()
1937         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1938         self.test_session.sha1_key = key
1939         self.test_session.send_packet()
1940         for dummy in range(self.test_session.detect_mult * 2):
1941             p = wait_for_bfd_packet(self)
1942             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1943             self.test_session.send_packet()
1944         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1945         self.assert_equal(len(self.vapi.collect_events()), 0,
1946                           "number of bfd events")
1947
1948     def test_auth_off_delayed(self):
1949         """ turn auth off without disturbing session state (delayed) """
1950         key = self.factory.create_random_key(self)
1951         key.add_vpp_config()
1952         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1953                                             self.pg0.remote_ip4, sha1_key=key)
1954         self.vpp_session.add_vpp_config()
1955         self.test_session = BFDTestSession(
1956             self, self.pg0, AF_INET, sha1_key=key,
1957             bfd_key_id=self.vpp_session.bfd_key_id)
1958         bfd_session_up(self)
1959         for dummy in range(self.test_session.detect_mult * 2):
1960             p = wait_for_bfd_packet(self)
1961             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1962             self.test_session.send_packet()
1963         self.vpp_session.deactivate_auth(delayed=True)
1964         for dummy in range(self.test_session.detect_mult * 2):
1965             p = wait_for_bfd_packet(self)
1966             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1967             self.test_session.send_packet()
1968         self.test_session.bfd_key_id = None
1969         self.test_session.sha1_key = None
1970         self.test_session.send_packet()
1971         for dummy in range(self.test_session.detect_mult * 2):
1972             p = wait_for_bfd_packet(self)
1973             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1974             self.test_session.send_packet()
1975         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1976         self.assert_equal(len(self.vapi.collect_events()), 0,
1977                           "number of bfd events")
1978
1979     def test_auth_change_key_delayed(self):
1980         """ change auth key without disturbing session state (delayed) """
1981         key1 = self.factory.create_random_key(self)
1982         key1.add_vpp_config()
1983         key2 = self.factory.create_random_key(self)
1984         key2.add_vpp_config()
1985         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1986                                             self.pg0.remote_ip4, sha1_key=key1)
1987         self.vpp_session.add_vpp_config()
1988         self.vpp_session.admin_up()
1989         self.test_session = BFDTestSession(
1990             self, self.pg0, AF_INET, sha1_key=key1,
1991             bfd_key_id=self.vpp_session.bfd_key_id)
1992         bfd_session_up(self)
1993         for dummy in range(self.test_session.detect_mult * 2):
1994             p = wait_for_bfd_packet(self)
1995             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1996             self.test_session.send_packet()
1997         self.vpp_session.activate_auth(key2, delayed=True)
1998         for dummy in range(self.test_session.detect_mult * 2):
1999             p = wait_for_bfd_packet(self)
2000             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2001             self.test_session.send_packet()
2002         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2003         self.test_session.sha1_key = key2
2004         self.test_session.send_packet()
2005         for dummy in range(self.test_session.detect_mult * 2):
2006             p = wait_for_bfd_packet(self)
2007             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2008             self.test_session.send_packet()
2009         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2010         self.assert_equal(len(self.vapi.collect_events()), 0,
2011                           "number of bfd events")
2012
2013
2014 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2015 class BFDCLITestCase(VppTestCase):
2016     """Bidirectional Forwarding Detection (BFD) (CLI) """
2017     pg0 = None
2018
2019     @classmethod
2020     def setUpClass(cls):
2021         super(BFDCLITestCase, cls).setUpClass()
2022
2023         try:
2024             cls.create_pg_interfaces((0,))
2025             cls.pg0.config_ip4()
2026             cls.pg0.config_ip6()
2027             cls.pg0.resolve_arp()
2028             cls.pg0.resolve_ndp()
2029
2030         except Exception:
2031             super(BFDCLITestCase, cls).tearDownClass()
2032             raise
2033
2034     def setUp(self):
2035         super(BFDCLITestCase, self).setUp()
2036         self.factory = AuthKeyFactory()
2037         self.pg0.enable_capture()
2038
2039     def tearDown(self):
2040         try:
2041             self.vapi.want_bfd_events(enable_disable=0)
2042         except UnexpectedApiReturnValueError:
2043             # some tests aren't subscribed, so this is not an issue
2044             pass
2045         self.vapi.collect_events()  # clear the event queue
2046         super(BFDCLITestCase, self).tearDown()
2047
2048     def cli_verify_no_response(self, cli):
2049         """ execute a CLI, asserting that the response is empty """
2050         self.assert_equal(self.vapi.cli(cli),
2051                           "",
2052                           "CLI command response")
2053
2054     def cli_verify_response(self, cli, expected):
2055         """ execute a CLI, asserting that the response matches expectation """
2056         self.assert_equal(self.vapi.cli(cli).strip(),
2057                           expected,
2058                           "CLI command response")
2059
2060     def test_show(self):
2061         """ show commands """
2062         k1 = self.factory.create_random_key(self)
2063         k1.add_vpp_config()
2064         k2 = self.factory.create_random_key(
2065             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2066         k2.add_vpp_config()
2067         s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2068         s1.add_vpp_config()
2069         s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2070                               sha1_key=k2)
2071         s2.add_vpp_config()
2072         self.logger.info(self.vapi.ppcli("show bfd keys"))
2073         self.logger.info(self.vapi.ppcli("show bfd sessions"))
2074         self.logger.info(self.vapi.ppcli("show bfd"))
2075
2076     def test_set_del_sha1_key(self):
2077         """ set/delete SHA1 auth key """
2078         k = self.factory.create_random_key(self)
2079         self.registry.register(k, self.logger)
2080         self.cli_verify_no_response(
2081             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2082             (k.conf_key_id,
2083                 "".join("{:02x}".format(ord(c)) for c in k.key)))
2084         self.assertTrue(k.query_vpp_config())
2085         self.vpp_session = VppBFDUDPSession(
2086             self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2087         self.vpp_session.add_vpp_config()
2088         self.test_session = \
2089             BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2090                            bfd_key_id=self.vpp_session.bfd_key_id)
2091         self.vapi.want_bfd_events()
2092         bfd_session_up(self)
2093         bfd_session_down(self)
2094         # try to replace the secret for the key - should fail because the key
2095         # is in-use
2096         k2 = self.factory.create_random_key(self)
2097         self.cli_verify_response(
2098             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2099             (k.conf_key_id,
2100                 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2101             "bfd key set: `bfd_auth_set_key' API call failed, "
2102             "rv=-103:BFD object in use")
2103         # manipulating the session using old secret should still work
2104         bfd_session_up(self)
2105         bfd_session_down(self)
2106         self.vpp_session.remove_vpp_config()
2107         self.cli_verify_no_response(
2108             "bfd key del conf-key-id %s" % k.conf_key_id)
2109         self.assertFalse(k.query_vpp_config())
2110
2111     def test_set_del_meticulous_sha1_key(self):
2112         """ set/delete meticulous SHA1 auth key """
2113         k = self.factory.create_random_key(
2114             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2115         self.registry.register(k, self.logger)
2116         self.cli_verify_no_response(
2117             "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2118             (k.conf_key_id,
2119                 "".join("{:02x}".format(ord(c)) for c in k.key)))
2120         self.assertTrue(k.query_vpp_config())
2121         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2122                                             self.pg0.remote_ip6, af=AF_INET6,
2123                                             sha1_key=k)
2124         self.vpp_session.add_vpp_config()
2125         self.vpp_session.admin_up()
2126         self.test_session = \
2127             BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2128                            bfd_key_id=self.vpp_session.bfd_key_id)
2129         self.vapi.want_bfd_events()
2130         bfd_session_up(self)
2131         bfd_session_down(self)
2132         # try to replace the secret for the key - should fail because the key
2133         # is in-use
2134         k2 = self.factory.create_random_key(self)
2135         self.cli_verify_response(
2136             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2137             (k.conf_key_id,
2138                 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2139             "bfd key set: `bfd_auth_set_key' API call failed, "
2140             "rv=-103:BFD object in use")
2141         # manipulating the session using old secret should still work
2142         bfd_session_up(self)
2143         bfd_session_down(self)
2144         self.vpp_session.remove_vpp_config()
2145         self.cli_verify_no_response(
2146             "bfd key del conf-key-id %s" % k.conf_key_id)
2147         self.assertFalse(k.query_vpp_config())
2148
2149     def test_add_mod_del_bfd_udp(self):
2150         """ create/modify/delete IPv4 BFD UDP session """
2151         vpp_session = VppBFDUDPSession(
2152             self, self.pg0, self.pg0.remote_ip4)
2153         self.registry.register(vpp_session, self.logger)
2154         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2155             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2156             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2157                                 self.pg0.remote_ip4,
2158                                 vpp_session.desired_min_tx,
2159                                 vpp_session.required_min_rx,
2160                                 vpp_session.detect_mult)
2161         self.cli_verify_no_response(cli_add_cmd)
2162         # 2nd add should fail
2163         self.cli_verify_response(
2164             cli_add_cmd,
2165             "bfd udp session add: `bfd_add_add_session' API call"
2166             " failed, rv=-101:Duplicate BFD object")
2167         verify_bfd_session_config(self, vpp_session)
2168         mod_session = VppBFDUDPSession(
2169             self, self.pg0, self.pg0.remote_ip4,
2170             required_min_rx=2 * vpp_session.required_min_rx,
2171             desired_min_tx=3 * vpp_session.desired_min_tx,
2172             detect_mult=4 * vpp_session.detect_mult)
2173         self.cli_verify_no_response(
2174             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2175             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2176             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2177              mod_session.desired_min_tx, mod_session.required_min_rx,
2178              mod_session.detect_mult))
2179         verify_bfd_session_config(self, mod_session)
2180         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2181             "peer-addr %s" % (self.pg0.name,
2182                               self.pg0.local_ip4, self.pg0.remote_ip4)
2183         self.cli_verify_no_response(cli_del_cmd)
2184         # 2nd del is expected to fail
2185         self.cli_verify_response(
2186             cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2187             " failed, rv=-102:No such BFD object")
2188         self.assertFalse(vpp_session.query_vpp_config())
2189
2190     def test_add_mod_del_bfd_udp6(self):
2191         """ create/modify/delete IPv6 BFD UDP session """
2192         vpp_session = VppBFDUDPSession(
2193             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2194         self.registry.register(vpp_session, self.logger)
2195         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2196             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2197             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2198                                 self.pg0.remote_ip6,
2199                                 vpp_session.desired_min_tx,
2200                                 vpp_session.required_min_rx,
2201                                 vpp_session.detect_mult)
2202         self.cli_verify_no_response(cli_add_cmd)
2203         # 2nd add should fail
2204         self.cli_verify_response(
2205             cli_add_cmd,
2206             "bfd udp session add: `bfd_add_add_session' API call"
2207             " failed, rv=-101:Duplicate BFD object")
2208         verify_bfd_session_config(self, vpp_session)
2209         mod_session = VppBFDUDPSession(
2210             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2211             required_min_rx=2 * vpp_session.required_min_rx,
2212             desired_min_tx=3 * vpp_session.desired_min_tx,
2213             detect_mult=4 * vpp_session.detect_mult)
2214         self.cli_verify_no_response(
2215             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2216             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2217             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2218              mod_session.desired_min_tx,
2219              mod_session.required_min_rx, mod_session.detect_mult))
2220         verify_bfd_session_config(self, mod_session)
2221         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2222             "peer-addr %s" % (self.pg0.name,
2223                               self.pg0.local_ip6, self.pg0.remote_ip6)
2224         self.cli_verify_no_response(cli_del_cmd)
2225         # 2nd del is expected to fail
2226         self.cli_verify_response(
2227             cli_del_cmd,
2228             "bfd udp session del: `bfd_udp_del_session' API call"
2229             " failed, rv=-102:No such BFD object")
2230         self.assertFalse(vpp_session.query_vpp_config())
2231
2232     def test_add_mod_del_bfd_udp_auth(self):
2233         """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2234         key = self.factory.create_random_key(self)
2235         key.add_vpp_config()
2236         vpp_session = VppBFDUDPSession(
2237             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2238         self.registry.register(vpp_session, self.logger)
2239         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2240             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2241             "detect-mult %s conf-key-id %s bfd-key-id %s"\
2242             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2243                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2244                vpp_session.detect_mult, key.conf_key_id,
2245                vpp_session.bfd_key_id)
2246         self.cli_verify_no_response(cli_add_cmd)
2247         # 2nd add should fail
2248         self.cli_verify_response(
2249             cli_add_cmd,
2250             "bfd udp session add: `bfd_add_add_session' API call"
2251             " failed, rv=-101:Duplicate BFD object")
2252         verify_bfd_session_config(self, vpp_session)
2253         mod_session = VppBFDUDPSession(
2254             self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2255             bfd_key_id=vpp_session.bfd_key_id,
2256             required_min_rx=2 * vpp_session.required_min_rx,
2257             desired_min_tx=3 * vpp_session.desired_min_tx,
2258             detect_mult=4 * vpp_session.detect_mult)
2259         self.cli_verify_no_response(
2260             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2261             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2262             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2263              mod_session.desired_min_tx,
2264              mod_session.required_min_rx, mod_session.detect_mult))
2265         verify_bfd_session_config(self, mod_session)
2266         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2267             "peer-addr %s" % (self.pg0.name,
2268                               self.pg0.local_ip4, self.pg0.remote_ip4)
2269         self.cli_verify_no_response(cli_del_cmd)
2270         # 2nd del is expected to fail
2271         self.cli_verify_response(
2272             cli_del_cmd,
2273             "bfd udp session del: `bfd_udp_del_session' API call"
2274             " failed, rv=-102:No such BFD object")
2275         self.assertFalse(vpp_session.query_vpp_config())
2276
2277     def test_add_mod_del_bfd_udp6_auth(self):
2278         """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2279         key = self.factory.create_random_key(
2280             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2281         key.add_vpp_config()
2282         vpp_session = VppBFDUDPSession(
2283             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2284         self.registry.register(vpp_session, self.logger)
2285         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2286             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2287             "detect-mult %s conf-key-id %s bfd-key-id %s" \
2288             % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2289                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2290                vpp_session.detect_mult, key.conf_key_id,
2291                vpp_session.bfd_key_id)
2292         self.cli_verify_no_response(cli_add_cmd)
2293         # 2nd add should fail
2294         self.cli_verify_response(
2295             cli_add_cmd,
2296             "bfd udp session add: `bfd_add_add_session' API call"
2297             " failed, rv=-101:Duplicate BFD object")
2298         verify_bfd_session_config(self, vpp_session)
2299         mod_session = VppBFDUDPSession(
2300             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2301             bfd_key_id=vpp_session.bfd_key_id,
2302             required_min_rx=2 * vpp_session.required_min_rx,
2303             desired_min_tx=3 * vpp_session.desired_min_tx,
2304             detect_mult=4 * vpp_session.detect_mult)
2305         self.cli_verify_no_response(
2306             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2307             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2308             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2309              mod_session.desired_min_tx,
2310              mod_session.required_min_rx, mod_session.detect_mult))
2311         verify_bfd_session_config(self, mod_session)
2312         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2313             "peer-addr %s" % (self.pg0.name,
2314                               self.pg0.local_ip6, self.pg0.remote_ip6)
2315         self.cli_verify_no_response(cli_del_cmd)
2316         # 2nd del is expected to fail
2317         self.cli_verify_response(
2318             cli_del_cmd,
2319             "bfd udp session del: `bfd_udp_del_session' API call"
2320             " failed, rv=-102:No such BFD object")
2321         self.assertFalse(vpp_session.query_vpp_config())
2322
2323     def test_auth_on_off(self):
2324         """ turn authentication on and off """
2325         key = self.factory.create_random_key(
2326             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2327         key.add_vpp_config()
2328         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2329         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2330                                         sha1_key=key)
2331         session.add_vpp_config()
2332         cli_activate = \
2333             "bfd udp session auth activate interface %s local-addr %s "\
2334             "peer-addr %s conf-key-id %s bfd-key-id %s"\
2335             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2336                key.conf_key_id, auth_session.bfd_key_id)
2337         self.cli_verify_no_response(cli_activate)
2338         verify_bfd_session_config(self, auth_session)
2339         self.cli_verify_no_response(cli_activate)
2340         verify_bfd_session_config(self, auth_session)
2341         cli_deactivate = \
2342             "bfd udp session auth deactivate interface %s local-addr %s "\
2343             "peer-addr %s "\
2344             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2345         self.cli_verify_no_response(cli_deactivate)
2346         verify_bfd_session_config(self, session)
2347         self.cli_verify_no_response(cli_deactivate)
2348         verify_bfd_session_config(self, session)
2349
2350     def test_auth_on_off_delayed(self):
2351         """ turn authentication on and off (delayed) """
2352         key = self.factory.create_random_key(
2353             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2354         key.add_vpp_config()
2355         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2356         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2357                                         sha1_key=key)
2358         session.add_vpp_config()
2359         cli_activate = \
2360             "bfd udp session auth activate interface %s local-addr %s "\
2361             "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2362             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2363                key.conf_key_id, auth_session.bfd_key_id)
2364         self.cli_verify_no_response(cli_activate)
2365         verify_bfd_session_config(self, auth_session)
2366         self.cli_verify_no_response(cli_activate)
2367         verify_bfd_session_config(self, auth_session)
2368         cli_deactivate = \
2369             "bfd udp session auth deactivate interface %s local-addr %s "\
2370             "peer-addr %s delayed yes"\
2371             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2372         self.cli_verify_no_response(cli_deactivate)
2373         verify_bfd_session_config(self, session)
2374         self.cli_verify_no_response(cli_deactivate)
2375         verify_bfd_session_config(self, session)
2376
2377     def test_admin_up_down(self):
2378         """ put session admin-up and admin-down """
2379         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2380         session.add_vpp_config()
2381         cli_down = \
2382             "bfd udp session set-flags admin down interface %s local-addr %s "\
2383             "peer-addr %s "\
2384             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2385         cli_up = \
2386             "bfd udp session set-flags admin up interface %s local-addr %s "\
2387             "peer-addr %s "\
2388             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2389         self.cli_verify_no_response(cli_down)
2390         verify_bfd_session_config(self, session, state=BFDState.admin_down)
2391         self.cli_verify_no_response(cli_up)
2392         verify_bfd_session_config(self, session, state=BFDState.down)
2393
2394     def test_set_del_udp_echo_source(self):
2395         """ set/del udp echo source """
2396         self.create_loopback_interfaces([0])
2397         self.loopback0 = self.lo_interfaces[0]
2398         self.loopback0.admin_up()
2399         self.cli_verify_response("show bfd echo-source",
2400                                  "UDP echo source is not set.")
2401         cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2402         self.cli_verify_no_response(cli_set)
2403         self.cli_verify_response("show bfd echo-source",
2404                                  "UDP echo source is: %s\n"
2405                                  "IPv4 address usable as echo source: none\n"
2406                                  "IPv6 address usable as echo source: none" %
2407                                  self.loopback0.name)
2408         self.loopback0.config_ip4()
2409         unpacked = unpack("!L", self.loopback0.local_ip4n)
2410         echo_ip4 = inet_ntop(AF_INET, pack("!L", unpacked[0] ^ 1))
2411         self.cli_verify_response("show bfd echo-source",
2412                                  "UDP echo source is: %s\n"
2413                                  "IPv4 address usable as echo source: %s\n"
2414                                  "IPv6 address usable as echo source: none" %
2415                                  (self.loopback0.name, echo_ip4))
2416         unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
2417         echo_ip6 = inet_ntop(AF_INET6, pack("!LLLL", unpacked[0], unpacked[1],
2418                                             unpacked[2], unpacked[3] ^ 1))
2419         self.loopback0.config_ip6()
2420         self.cli_verify_response("show bfd echo-source",
2421                                  "UDP echo source is: %s\n"
2422                                  "IPv4 address usable as echo source: %s\n"
2423                                  "IPv6 address usable as echo source: %s" %
2424                                  (self.loopback0.name, echo_ip4, echo_ip6))
2425         cli_del = "bfd udp echo-source del"
2426         self.cli_verify_no_response(cli_del)
2427         self.cli_verify_response("show bfd echo-source",
2428                                  "UDP echo source is not set.")
2429
2430 if __name__ == '__main__':
2431     unittest.main(testRunner=VppTestRunner)