make test: relax BFD time intervals
[vpp.git] / test / test_bfd.py
1 #!/usr/bin/env python
2 """ BFD tests """
3
4 from __future__ import division
5 import unittest
6 import hashlib
7 import binascii
8 import time
9 from struct import pack, unpack
10 from random import randint, shuffle, getrandbits
11 from socket import AF_INET, AF_INET6, inet_ntop
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether
14 from scapy.layers.inet import UDP, IP
15 from scapy.layers.inet6 import IPv6
16 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
17     BFDDiagCode, BFDState, BFD_vpp_echo
18 from framework import VppTestCase, VppTestRunner, running_extended_tests
19 from vpp_pg_interface import CaptureTimeoutError
20 from util import ppp
21 from vpp_papi_provider import UnexpectedApiReturnValueError
22
23 USEC_IN_SEC = 1000000
24
25
26 class AuthKeyFactory(object):
27     """Factory class for creating auth keys with unique conf key ID"""
28
29     def __init__(self):
30         self._conf_key_ids = {}
31
32     def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
33         """ create a random key with unique conf key id """
34         conf_key_id = randint(0, 0xFFFFFFFF)
35         while conf_key_id in self._conf_key_ids:
36             conf_key_id = randint(0, 0xFFFFFFFF)
37         self._conf_key_ids[conf_key_id] = 1
38         key = str(bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
39         return VppBFDAuthKey(test=test, auth_type=auth_type,
40                              conf_key_id=conf_key_id, key=key)
41
42
43 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
44 class BFDAPITestCase(VppTestCase):
45     """Bidirectional Forwarding Detection (BFD) - API"""
46
47     pg0 = None
48     pg1 = None
49
50     @classmethod
51     def setUpClass(cls):
52         super(BFDAPITestCase, cls).setUpClass()
53
54         try:
55             cls.create_pg_interfaces(range(2))
56             for i in cls.pg_interfaces:
57                 i.config_ip4()
58                 i.config_ip6()
59                 i.resolve_arp()
60
61         except Exception:
62             super(BFDAPITestCase, cls).tearDownClass()
63             raise
64
65     def setUp(self):
66         super(BFDAPITestCase, self).setUp()
67         self.factory = AuthKeyFactory()
68
69     def test_add_bfd(self):
70         """ create a BFD session """
71         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
72         session.add_vpp_config()
73         self.logger.debug("Session state is %s", session.state)
74         session.remove_vpp_config()
75         session.add_vpp_config()
76         self.logger.debug("Session state is %s", session.state)
77         session.remove_vpp_config()
78
79     def test_double_add(self):
80         """ create the same BFD session twice (negative case) """
81         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
82         session.add_vpp_config()
83
84         with self.vapi.expect_negative_api_retval():
85             session.add_vpp_config()
86
87         session.remove_vpp_config()
88
89     def test_add_bfd6(self):
90         """ create IPv6 BFD session """
91         session = VppBFDUDPSession(
92             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
93         session.add_vpp_config()
94         self.logger.debug("Session state is %s", session.state)
95         session.remove_vpp_config()
96         session.add_vpp_config()
97         self.logger.debug("Session state is %s", session.state)
98         session.remove_vpp_config()
99
100     def test_mod_bfd(self):
101         """ modify BFD session parameters """
102         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
103                                    desired_min_tx=50000,
104                                    required_min_rx=10000,
105                                    detect_mult=1)
106         session.add_vpp_config()
107         s = session.get_bfd_udp_session_dump_entry()
108         self.assert_equal(session.desired_min_tx,
109                           s.desired_min_tx,
110                           "desired min transmit interval")
111         self.assert_equal(session.required_min_rx,
112                           s.required_min_rx,
113                           "required min receive interval")
114         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
115         session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
116                                   required_min_rx=session.required_min_rx * 2,
117                                   detect_mult=session.detect_mult * 2)
118         s = session.get_bfd_udp_session_dump_entry()
119         self.assert_equal(session.desired_min_tx,
120                           s.desired_min_tx,
121                           "desired min transmit interval")
122         self.assert_equal(session.required_min_rx,
123                           s.required_min_rx,
124                           "required min receive interval")
125         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
126
127     def test_add_sha1_keys(self):
128         """ add SHA1 keys """
129         key_count = 10
130         keys = [self.factory.create_random_key(
131             self) for i in range(0, key_count)]
132         for key in keys:
133             self.assertFalse(key.query_vpp_config())
134         for key in keys:
135             key.add_vpp_config()
136         for key in keys:
137             self.assertTrue(key.query_vpp_config())
138         # remove randomly
139         indexes = range(key_count)
140         shuffle(indexes)
141         removed = []
142         for i in indexes:
143             key = keys[i]
144             key.remove_vpp_config()
145             removed.append(i)
146             for j in range(key_count):
147                 key = keys[j]
148                 if j in removed:
149                     self.assertFalse(key.query_vpp_config())
150                 else:
151                     self.assertTrue(key.query_vpp_config())
152         # should be removed now
153         for key in keys:
154             self.assertFalse(key.query_vpp_config())
155         # add back and remove again
156         for key in keys:
157             key.add_vpp_config()
158         for key in keys:
159             self.assertTrue(key.query_vpp_config())
160         for key in keys:
161             key.remove_vpp_config()
162         for key in keys:
163             self.assertFalse(key.query_vpp_config())
164
165     def test_add_bfd_sha1(self):
166         """ create a BFD session (SHA1) """
167         key = self.factory.create_random_key(self)
168         key.add_vpp_config()
169         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
170                                    sha1_key=key)
171         session.add_vpp_config()
172         self.logger.debug("Session state is %s", session.state)
173         session.remove_vpp_config()
174         session.add_vpp_config()
175         self.logger.debug("Session state is %s", session.state)
176         session.remove_vpp_config()
177
178     def test_double_add_sha1(self):
179         """ create the same BFD session twice (negative case) (SHA1) """
180         key = self.factory.create_random_key(self)
181         key.add_vpp_config()
182         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
183                                    sha1_key=key)
184         session.add_vpp_config()
185         with self.assertRaises(Exception):
186             session.add_vpp_config()
187
188     def test_add_auth_nonexistent_key(self):
189         """ create BFD session using non-existent SHA1 (negative case) """
190         session = VppBFDUDPSession(
191             self, self.pg0, self.pg0.remote_ip4,
192             sha1_key=self.factory.create_random_key(self))
193         with self.assertRaises(Exception):
194             session.add_vpp_config()
195
196     def test_shared_sha1_key(self):
197         """ share single SHA1 key between multiple BFD sessions """
198         key = self.factory.create_random_key(self)
199         key.add_vpp_config()
200         sessions = [
201             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
202                              sha1_key=key),
203             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
204                              sha1_key=key, af=AF_INET6),
205             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
206                              sha1_key=key),
207             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
208                              sha1_key=key, af=AF_INET6)]
209         for s in sessions:
210             s.add_vpp_config()
211         removed = 0
212         for s in sessions:
213             e = key.get_bfd_auth_keys_dump_entry()
214             self.assert_equal(e.use_count, len(sessions) - removed,
215                               "Use count for shared key")
216             s.remove_vpp_config()
217             removed += 1
218         e = key.get_bfd_auth_keys_dump_entry()
219         self.assert_equal(e.use_count, len(sessions) - removed,
220                           "Use count for shared key")
221
222     def test_activate_auth(self):
223         """ activate SHA1 authentication """
224         key = self.factory.create_random_key(self)
225         key.add_vpp_config()
226         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
227         session.add_vpp_config()
228         session.activate_auth(key)
229
230     def test_deactivate_auth(self):
231         """ deactivate SHA1 authentication """
232         key = self.factory.create_random_key(self)
233         key.add_vpp_config()
234         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
235         session.add_vpp_config()
236         session.activate_auth(key)
237         session.deactivate_auth()
238
239     def test_change_key(self):
240         """ change SHA1 key """
241         key1 = self.factory.create_random_key(self)
242         key2 = self.factory.create_random_key(self)
243         while key2.conf_key_id == key1.conf_key_id:
244             key2 = self.factory.create_random_key(self)
245         key1.add_vpp_config()
246         key2.add_vpp_config()
247         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
248                                    sha1_key=key1)
249         session.add_vpp_config()
250         session.activate_auth(key2)
251
252
253 class BFDTestSession(object):
254     """ BFD session as seen from test framework side """
255
256     def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
257                  bfd_key_id=None, our_seq_number=None):
258         self.test = test
259         self.af = af
260         self.sha1_key = sha1_key
261         self.bfd_key_id = bfd_key_id
262         self.interface = interface
263         self.udp_sport = randint(49152, 65535)
264         if our_seq_number is None:
265             self.our_seq_number = randint(0, 40000000)
266         else:
267             self.our_seq_number = our_seq_number
268         self.vpp_seq_number = None
269         self.my_discriminator = 0
270         self.desired_min_tx = 300000
271         self.required_min_rx = 300000
272         self.required_min_echo_rx = None
273         self.detect_mult = detect_mult
274         self.diag = BFDDiagCode.no_diagnostic
275         self.your_discriminator = None
276         self.state = BFDState.down
277         self.auth_type = BFDAuthType.no_auth
278
279     def inc_seq_num(self):
280         """ increment sequence number, wrapping if needed """
281         if self.our_seq_number == 0xFFFFFFFF:
282             self.our_seq_number = 0
283         else:
284             self.our_seq_number += 1
285
286     def update(self, my_discriminator=None, your_discriminator=None,
287                desired_min_tx=None, required_min_rx=None,
288                required_min_echo_rx=None, detect_mult=None,
289                diag=None, state=None, auth_type=None):
290         """ update BFD parameters associated with session """
291         if my_discriminator is not None:
292             self.my_discriminator = my_discriminator
293         if your_discriminator is not None:
294             self.your_discriminator = your_discriminator
295         if required_min_rx is not None:
296             self.required_min_rx = required_min_rx
297         if required_min_echo_rx is not None:
298             self.required_min_echo_rx = required_min_echo_rx
299         if desired_min_tx is not None:
300             self.desired_min_tx = desired_min_tx
301         if detect_mult is not None:
302             self.detect_mult = detect_mult
303         if diag is not None:
304             self.diag = diag
305         if state is not None:
306             self.state = state
307         if auth_type is not None:
308             self.auth_type = auth_type
309
310     def fill_packet_fields(self, packet):
311         """ set packet fields with known values in packet """
312         bfd = packet[BFD]
313         if self.my_discriminator:
314             self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
315                                    self.my_discriminator)
316             bfd.my_discriminator = self.my_discriminator
317         if self.your_discriminator:
318             self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
319                                    self.your_discriminator)
320             bfd.your_discriminator = self.your_discriminator
321         if self.required_min_rx:
322             self.test.logger.debug(
323                 "BFD: setting packet.required_min_rx_interval=%s",
324                 self.required_min_rx)
325             bfd.required_min_rx_interval = self.required_min_rx
326         if self.required_min_echo_rx:
327             self.test.logger.debug(
328                 "BFD: setting packet.required_min_echo_rx=%s",
329                 self.required_min_echo_rx)
330             bfd.required_min_echo_rx_interval = self.required_min_echo_rx
331         if self.desired_min_tx:
332             self.test.logger.debug(
333                 "BFD: setting packet.desired_min_tx_interval=%s",
334                 self.desired_min_tx)
335             bfd.desired_min_tx_interval = self.desired_min_tx
336         if self.detect_mult:
337             self.test.logger.debug(
338                 "BFD: setting packet.detect_mult=%s", self.detect_mult)
339             bfd.detect_mult = self.detect_mult
340         if self.diag:
341             self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
342             bfd.diag = self.diag
343         if self.state:
344             self.test.logger.debug("BFD: setting packet.state=%s", self.state)
345             bfd.state = self.state
346         if self.auth_type:
347             # this is used by a negative test-case
348             self.test.logger.debug("BFD: setting packet.auth_type=%s",
349                                    self.auth_type)
350             bfd.auth_type = self.auth_type
351
352     def create_packet(self):
353         """ create a BFD packet, reflecting the current state of session """
354         if self.sha1_key:
355             bfd = BFD(flags="A")
356             bfd.auth_type = self.sha1_key.auth_type
357             bfd.auth_len = BFD.sha1_auth_len
358             bfd.auth_key_id = self.bfd_key_id
359             bfd.auth_seq_num = self.our_seq_number
360             bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
361         else:
362             bfd = BFD()
363         if self.af == AF_INET6:
364             packet = (Ether(src=self.interface.remote_mac,
365                             dst=self.interface.local_mac) /
366                       IPv6(src=self.interface.remote_ip6,
367                            dst=self.interface.local_ip6,
368                            hlim=255) /
369                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
370                       bfd)
371         else:
372             packet = (Ether(src=self.interface.remote_mac,
373                             dst=self.interface.local_mac) /
374                       IP(src=self.interface.remote_ip4,
375                          dst=self.interface.local_ip4,
376                          ttl=255) /
377                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
378                       bfd)
379         self.test.logger.debug("BFD: Creating packet")
380         self.fill_packet_fields(packet)
381         if self.sha1_key:
382             hash_material = str(packet[BFD])[:32] + self.sha1_key.key + \
383                 "\0" * (20 - len(self.sha1_key.key))
384             self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
385                                    hashlib.sha1(hash_material).hexdigest())
386             packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
387         return packet
388
389     def send_packet(self, packet=None, interface=None):
390         """ send packet on interface, creating the packet if needed """
391         if packet is None:
392             packet = self.create_packet()
393         if interface is None:
394             interface = self.test.pg0
395         self.test.logger.debug(ppp("Sending packet:", packet))
396         interface.add_stream(packet)
397         self.test.pg_start()
398
399     def verify_sha1_auth(self, packet):
400         """ Verify correctness of authentication in BFD layer. """
401         bfd = packet[BFD]
402         self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
403         self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
404                                BFDAuthType)
405         self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
406         self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
407         if self.vpp_seq_number is None:
408             self.vpp_seq_number = bfd.auth_seq_num
409             self.test.logger.debug("Received initial sequence number: %s" %
410                                    self.vpp_seq_number)
411         else:
412             recvd_seq_num = bfd.auth_seq_num
413             self.test.logger.debug("Received followup sequence number: %s" %
414                                    recvd_seq_num)
415             if self.vpp_seq_number < 0xffffffff:
416                 if self.sha1_key.auth_type == \
417                         BFDAuthType.meticulous_keyed_sha1:
418                     self.test.assert_equal(recvd_seq_num,
419                                            self.vpp_seq_number + 1,
420                                            "BFD sequence number")
421                 else:
422                     self.test.assert_in_range(recvd_seq_num,
423                                               self.vpp_seq_number,
424                                               self.vpp_seq_number + 1,
425                                               "BFD sequence number")
426             else:
427                 if self.sha1_key.auth_type == \
428                         BFDAuthType.meticulous_keyed_sha1:
429                     self.test.assert_equal(recvd_seq_num, 0,
430                                            "BFD sequence number")
431                 else:
432                     self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
433                                        "BFD sequence number not one of "
434                                        "(%s, 0)" % self.vpp_seq_number)
435             self.vpp_seq_number = recvd_seq_num
436         # last 20 bytes represent the hash - so replace them with the key,
437         # pad the result with zeros and hash the result
438         hash_material = bfd.original[:-20] + self.sha1_key.key + \
439             "\0" * (20 - len(self.sha1_key.key))
440         expected_hash = hashlib.sha1(hash_material).hexdigest()
441         self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
442                                expected_hash, "Auth key hash")
443
444     def verify_bfd(self, packet):
445         """ Verify correctness of BFD layer. """
446         bfd = packet[BFD]
447         self.test.assert_equal(bfd.version, 1, "BFD version")
448         self.test.assert_equal(bfd.your_discriminator,
449                                self.my_discriminator,
450                                "BFD - your discriminator")
451         if self.sha1_key:
452             self.verify_sha1_auth(packet)
453
454
455 def bfd_session_up(test):
456     """ Bring BFD session up """
457     test.logger.info("BFD: Waiting for slow hello")
458     p = wait_for_bfd_packet(test, 2)
459     old_offset = None
460     if hasattr(test, 'vpp_clock_offset'):
461         old_offset = test.vpp_clock_offset
462     test.vpp_clock_offset = time.time() - p.time
463     test.logger.debug("BFD: Calculated vpp clock offset: %s",
464                       test.vpp_clock_offset)
465     if old_offset:
466         test.assertAlmostEqual(
467             old_offset, test.vpp_clock_offset, delta=0.5,
468             msg="vpp clock offset not stable (new: %s, old: %s)" %
469             (test.vpp_clock_offset, old_offset))
470     test.logger.info("BFD: Sending Init")
471     test.test_session.update(my_discriminator=randint(0, 40000000),
472                              your_discriminator=p[BFD].my_discriminator,
473                              state=BFDState.init)
474     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
475             BFDAuthType.meticulous_keyed_sha1:
476         test.test_session.inc_seq_num()
477     test.test_session.send_packet()
478     test.logger.info("BFD: Waiting for event")
479     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
480     verify_event(test, e, expected_state=BFDState.up)
481     test.logger.info("BFD: Session is Up")
482     test.test_session.update(state=BFDState.up)
483     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
484             BFDAuthType.meticulous_keyed_sha1:
485         test.test_session.inc_seq_num()
486     test.test_session.send_packet()
487     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
488
489
490 def bfd_session_down(test):
491     """ Bring BFD session down """
492     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
493     test.test_session.update(state=BFDState.down)
494     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
495             BFDAuthType.meticulous_keyed_sha1:
496         test.test_session.inc_seq_num()
497     test.test_session.send_packet()
498     test.logger.info("BFD: Waiting for event")
499     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
500     verify_event(test, e, expected_state=BFDState.down)
501     test.logger.info("BFD: Session is Down")
502     test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
503
504
505 def verify_bfd_session_config(test, session, state=None):
506     dump = session.get_bfd_udp_session_dump_entry()
507     test.assertIsNotNone(dump)
508     # since dump is not none, we have verified that sw_if_index and addresses
509     # are valid (in get_bfd_udp_session_dump_entry)
510     if state:
511         test.assert_equal(dump.state, state, "session state")
512     test.assert_equal(dump.required_min_rx, session.required_min_rx,
513                       "required min rx interval")
514     test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
515                       "desired min tx interval")
516     test.assert_equal(dump.detect_mult, session.detect_mult,
517                       "detect multiplier")
518     if session.sha1_key is None:
519         test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
520     else:
521         test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
522         test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
523                           "bfd key id")
524         test.assert_equal(dump.conf_key_id,
525                           session.sha1_key.conf_key_id,
526                           "config key id")
527
528
529 def verify_ip(test, packet):
530     """ Verify correctness of IP layer. """
531     if test.vpp_session.af == AF_INET6:
532         ip = packet[IPv6]
533         local_ip = test.pg0.local_ip6
534         remote_ip = test.pg0.remote_ip6
535         test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
536     else:
537         ip = packet[IP]
538         local_ip = test.pg0.local_ip4
539         remote_ip = test.pg0.remote_ip4
540         test.assert_equal(ip.ttl, 255, "IPv4 TTL")
541     test.assert_equal(ip.src, local_ip, "IP source address")
542     test.assert_equal(ip.dst, remote_ip, "IP destination address")
543
544
545 def verify_udp(test, packet):
546     """ Verify correctness of UDP layer. """
547     udp = packet[UDP]
548     test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
549     test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
550                          "UDP source port")
551
552
553 def verify_event(test, event, expected_state):
554     """ Verify correctness of event values. """
555     e = event
556     test.logger.debug("BFD: Event: %s" % repr(e))
557     test.assert_equal(e.sw_if_index,
558                       test.vpp_session.interface.sw_if_index,
559                       "BFD interface index")
560     is_ipv6 = 0
561     if test.vpp_session.af == AF_INET6:
562         is_ipv6 = 1
563     test.assert_equal(e.is_ipv6, is_ipv6, "is_ipv6")
564     if test.vpp_session.af == AF_INET:
565         test.assert_equal(e.local_addr[:4], test.vpp_session.local_addr_n,
566                           "Local IPv4 address")
567         test.assert_equal(e.peer_addr[:4], test.vpp_session.peer_addr_n,
568                           "Peer IPv4 address")
569     else:
570         test.assert_equal(e.local_addr, test.vpp_session.local_addr_n,
571                           "Local IPv6 address")
572         test.assert_equal(e.peer_addr, test.vpp_session.peer_addr_n,
573                           "Peer IPv6 address")
574     test.assert_equal(e.state, expected_state, BFDState)
575
576
577 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None):
578     """ wait for BFD packet and verify its correctness
579
580     :param timeout: how long to wait
581     :param pcap_time_min: ignore packets with pcap timestamp lower than this
582
583     :returns: tuple (packet, time spent waiting for packet)
584     """
585     test.logger.info("BFD: Waiting for BFD packet")
586     deadline = time.time() + timeout
587     counter = 0
588     while True:
589         counter += 1
590         # sanity check
591         test.assert_in_range(counter, 0, 100, "number of packets ignored")
592         time_left = deadline - time.time()
593         if time_left < 0:
594             raise CaptureTimeoutError("Packet did not arrive within timeout")
595         p = test.pg0.wait_for_packet(timeout=time_left)
596         test.logger.debug(ppp("BFD: Got packet:", p))
597         if pcap_time_min is not None and p.time < pcap_time_min:
598             test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
599                                   "pcap time min %s):" %
600                                   (p.time, pcap_time_min), p))
601         else:
602             break
603     bfd = p[BFD]
604     if bfd is None:
605         raise Exception(ppp("Unexpected or invalid BFD packet:", p))
606     if bfd.payload:
607         raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
608     verify_ip(test, p)
609     verify_udp(test, p)
610     test.test_session.verify_bfd(p)
611     return p
612
613
614 class BFD4TestCase(VppTestCase):
615     """Bidirectional Forwarding Detection (BFD)"""
616
617     pg0 = None
618     vpp_clock_offset = None
619     vpp_session = None
620     test_session = None
621
622     @classmethod
623     def setUpClass(cls):
624         super(BFD4TestCase, cls).setUpClass()
625         try:
626             cls.create_pg_interfaces([0])
627             cls.create_loopback_interfaces([0])
628             cls.loopback0 = cls.lo_interfaces[0]
629             cls.loopback0.config_ip4()
630             cls.loopback0.admin_up()
631             cls.pg0.config_ip4()
632             cls.pg0.configure_ipv4_neighbors()
633             cls.pg0.admin_up()
634             cls.pg0.resolve_arp()
635
636         except Exception:
637             super(BFD4TestCase, cls).tearDownClass()
638             raise
639
640     def setUp(self):
641         super(BFD4TestCase, self).setUp()
642         self.factory = AuthKeyFactory()
643         self.vapi.want_bfd_events()
644         self.pg0.enable_capture()
645         try:
646             self.vpp_session = VppBFDUDPSession(self, self.pg0,
647                                                 self.pg0.remote_ip4)
648             self.vpp_session.add_vpp_config()
649             self.vpp_session.admin_up()
650             self.test_session = BFDTestSession(self, self.pg0, AF_INET)
651         except:
652             self.vapi.want_bfd_events(enable_disable=0)
653             raise
654
655     def tearDown(self):
656         if not self.vpp_dead:
657             self.vapi.want_bfd_events(enable_disable=0)
658         self.vapi.collect_events()  # clear the event queue
659         super(BFD4TestCase, self).tearDown()
660
661     def test_session_up(self):
662         """ bring BFD session up """
663         bfd_session_up(self)
664
665     def test_session_up_by_ip(self):
666         """ bring BFD session up - first frame looked up by address pair """
667         self.logger.info("BFD: Sending Slow control frame")
668         self.test_session.update(my_discriminator=randint(0, 40000000))
669         self.test_session.send_packet()
670         self.pg0.enable_capture()
671         p = self.pg0.wait_for_packet(1)
672         self.assert_equal(p[BFD].your_discriminator,
673                           self.test_session.my_discriminator,
674                           "BFD - your discriminator")
675         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
676         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
677                                  state=BFDState.up)
678         self.logger.info("BFD: Waiting for event")
679         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
680         verify_event(self, e, expected_state=BFDState.init)
681         self.logger.info("BFD: Sending Up")
682         self.test_session.send_packet()
683         self.logger.info("BFD: Waiting for event")
684         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
685         verify_event(self, e, expected_state=BFDState.up)
686         self.logger.info("BFD: Session is Up")
687         self.test_session.update(state=BFDState.up)
688         self.test_session.send_packet()
689         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
690
691     def test_session_down(self):
692         """ bring BFD session down """
693         bfd_session_up(self)
694         bfd_session_down(self)
695
696     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
697     def test_hold_up(self):
698         """ hold BFD session up """
699         bfd_session_up(self)
700         for dummy in range(self.test_session.detect_mult * 2):
701             wait_for_bfd_packet(self)
702             self.test_session.send_packet()
703         self.assert_equal(len(self.vapi.collect_events()), 0,
704                           "number of bfd events")
705
706     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
707     def test_slow_timer(self):
708         """ verify slow periodic control frames while session down """
709         packet_count = 3
710         self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
711         prev_packet = wait_for_bfd_packet(self, 2)
712         for dummy in range(packet_count):
713             next_packet = wait_for_bfd_packet(self, 2)
714             time_diff = next_packet.time - prev_packet.time
715             # spec says the range should be <0.75, 1>, allow extra 0.05 margin
716             # to work around timing issues
717             self.assert_in_range(
718                 time_diff, 0.70, 1.05, "time between slow packets")
719             prev_packet = next_packet
720
721     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
722     def test_zero_remote_min_rx(self):
723         """ no packets when zero remote required min rx interval """
724         bfd_session_up(self)
725         self.test_session.update(required_min_rx=0)
726         self.test_session.send_packet()
727         for dummy in range(self.test_session.detect_mult):
728             self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
729                        "sleep before transmitting bfd packet")
730             self.test_session.send_packet()
731             try:
732                 p = wait_for_bfd_packet(self, timeout=0)
733                 self.logger.error(ppp("Received unexpected packet:", p))
734             except CaptureTimeoutError:
735                 pass
736         self.assert_equal(
737             len(self.vapi.collect_events()), 0, "number of bfd events")
738         self.test_session.update(required_min_rx=300000)
739         for dummy in range(3):
740             self.test_session.send_packet()
741             wait_for_bfd_packet(
742                 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
743         self.assert_equal(
744             len(self.vapi.collect_events()), 0, "number of bfd events")
745
746     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
747     def test_conn_down(self):
748         """ verify session goes down after inactivity """
749         bfd_session_up(self)
750         detection_time = self.test_session.detect_mult *\
751             self.vpp_session.required_min_rx / USEC_IN_SEC
752         self.sleep(detection_time, "waiting for BFD session time-out")
753         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
754         verify_event(self, e, expected_state=BFDState.down)
755
756     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
757     def test_large_required_min_rx(self):
758         """ large remote required min rx interval """
759         bfd_session_up(self)
760         p = wait_for_bfd_packet(self)
761         interval = 3000000
762         self.test_session.update(required_min_rx=interval)
763         self.test_session.send_packet()
764         time_mark = time.time()
765         count = 0
766         # busy wait here, trying to collect a packet or event, vpp is not
767         # allowed to send packets and the session will timeout first - so the
768         # Up->Down event must arrive before any packets do
769         while time.time() < time_mark + interval / USEC_IN_SEC:
770             try:
771                 p = wait_for_bfd_packet(self, timeout=0)
772                 # if vpp managed to send a packet before we did the session
773                 # session update, then that's fine, ignore it
774                 if p.time < time_mark - self.vpp_clock_offset:
775                     continue
776                 self.logger.error(ppp("Received unexpected packet:", p))
777                 count += 1
778             except CaptureTimeoutError:
779                 pass
780             events = self.vapi.collect_events()
781             if len(events) > 0:
782                 verify_event(self, events[0], BFDState.down)
783                 break
784         self.assert_equal(count, 0, "number of packets received")
785
786     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
787     def test_immediate_remote_min_rx_reduction(self):
788         """ immediately honor remote required min rx reduction """
789         self.vpp_session.remove_vpp_config()
790         self.vpp_session = VppBFDUDPSession(
791             self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
792         self.pg0.enable_capture()
793         self.vpp_session.add_vpp_config()
794         self.test_session.update(desired_min_tx=1000000,
795                                  required_min_rx=1000000)
796         bfd_session_up(self)
797         reference_packet = wait_for_bfd_packet(self)
798         time_mark = time.time()
799         interval = 300000
800         self.test_session.update(required_min_rx=interval)
801         self.test_session.send_packet()
802         extra_time = time.time() - time_mark
803         p = wait_for_bfd_packet(self)
804         # first packet is allowed to be late by time we spent doing the update
805         # calculated in extra_time
806         self.assert_in_range(p.time - reference_packet.time,
807                              .95 * 0.75 * interval / USEC_IN_SEC,
808                              1.05 * interval / USEC_IN_SEC + extra_time,
809                              "time between BFD packets")
810         reference_packet = p
811         for dummy in range(3):
812             p = wait_for_bfd_packet(self)
813             diff = p.time - reference_packet.time
814             self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
815                                  1.05 * interval / USEC_IN_SEC,
816                                  "time between BFD packets")
817             reference_packet = p
818
819     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
820     def test_modify_req_min_rx_double(self):
821         """ modify session - double required min rx """
822         bfd_session_up(self)
823         p = wait_for_bfd_packet(self)
824         self.test_session.update(desired_min_tx=10000,
825                                  required_min_rx=10000)
826         self.test_session.send_packet()
827         # double required min rx
828         self.vpp_session.modify_parameters(
829             required_min_rx=2 * self.vpp_session.required_min_rx)
830         p = wait_for_bfd_packet(
831             self, pcap_time_min=time.time() - self.vpp_clock_offset)
832         # poll bit needs to be set
833         self.assertIn("P", p.sprintf("%BFD.flags%"),
834                       "Poll bit not set in BFD packet")
835         # finish poll sequence with final packet
836         final = self.test_session.create_packet()
837         final[BFD].flags = "F"
838         timeout = self.test_session.detect_mult * \
839             max(self.test_session.desired_min_tx,
840                 self.vpp_session.required_min_rx) / USEC_IN_SEC
841         self.test_session.send_packet(final)
842         time_mark = time.time()
843         e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
844         verify_event(self, e, expected_state=BFDState.down)
845         time_to_event = time.time() - time_mark
846         self.assert_in_range(time_to_event, .9 * timeout,
847                              1.1 * timeout, "session timeout")
848
849     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
850     def test_modify_req_min_rx_halve(self):
851         """ modify session - halve required min rx """
852         self.vpp_session.modify_parameters(
853             required_min_rx=2 * self.vpp_session.required_min_rx)
854         bfd_session_up(self)
855         p = wait_for_bfd_packet(self)
856         self.test_session.update(desired_min_tx=10000,
857                                  required_min_rx=10000)
858         self.test_session.send_packet()
859         p = wait_for_bfd_packet(
860             self, pcap_time_min=time.time() - self.vpp_clock_offset)
861         # halve required min rx
862         old_required_min_rx = self.vpp_session.required_min_rx
863         self.vpp_session.modify_parameters(
864             required_min_rx=0.5 * self.vpp_session.required_min_rx)
865         # now we wait 0.8*3*old-req-min-rx and the session should still be up
866         self.sleep(0.8 * self.vpp_session.detect_mult *
867                    old_required_min_rx / USEC_IN_SEC,
868                    "wait before finishing poll sequence")
869         self.assert_equal(len(self.vapi.collect_events()), 0,
870                           "number of bfd events")
871         p = wait_for_bfd_packet(self)
872         # poll bit needs to be set
873         self.assertIn("P", p.sprintf("%BFD.flags%"),
874                       "Poll bit not set in BFD packet")
875         # finish poll sequence with final packet
876         final = self.test_session.create_packet()
877         final[BFD].flags = "F"
878         self.test_session.send_packet(final)
879         # now the session should time out under new conditions
880         detection_time = self.test_session.detect_mult *\
881             self.vpp_session.required_min_rx / USEC_IN_SEC
882         before = time.time()
883         e = self.vapi.wait_for_event(
884             2 * detection_time, "bfd_udp_session_details")
885         after = time.time()
886         self.assert_in_range(after - before,
887                              0.9 * detection_time,
888                              1.1 * detection_time,
889                              "time before bfd session goes down")
890         verify_event(self, e, expected_state=BFDState.down)
891
892     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
893     def test_modify_detect_mult(self):
894         """ modify detect multiplier """
895         bfd_session_up(self)
896         p = wait_for_bfd_packet(self)
897         self.vpp_session.modify_parameters(detect_mult=1)
898         p = wait_for_bfd_packet(
899             self, pcap_time_min=time.time() - self.vpp_clock_offset)
900         self.assert_equal(self.vpp_session.detect_mult,
901                           p[BFD].detect_mult,
902                           "detect mult")
903         # poll bit must not be set
904         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
905                          "Poll bit not set in BFD packet")
906         self.vpp_session.modify_parameters(detect_mult=10)
907         p = wait_for_bfd_packet(
908             self, pcap_time_min=time.time() - self.vpp_clock_offset)
909         self.assert_equal(self.vpp_session.detect_mult,
910                           p[BFD].detect_mult,
911                           "detect mult")
912         # poll bit must not be set
913         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
914                          "Poll bit not set in BFD packet")
915
916     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
917     def test_queued_poll(self):
918         """ test poll sequence queueing """
919         bfd_session_up(self)
920         p = wait_for_bfd_packet(self)
921         self.vpp_session.modify_parameters(
922             required_min_rx=2 * self.vpp_session.required_min_rx)
923         p = wait_for_bfd_packet(self)
924         poll_sequence_start = time.time()
925         poll_sequence_length_min = 0.5
926         send_final_after = time.time() + poll_sequence_length_min
927         # poll bit needs to be set
928         self.assertIn("P", p.sprintf("%BFD.flags%"),
929                       "Poll bit not set in BFD packet")
930         self.assert_equal(p[BFD].required_min_rx_interval,
931                           self.vpp_session.required_min_rx,
932                           "BFD required min rx interval")
933         self.vpp_session.modify_parameters(
934             required_min_rx=2 * self.vpp_session.required_min_rx)
935         # 2nd poll sequence should be queued now
936         # don't send the reply back yet, wait for some time to emulate
937         # longer round-trip time
938         packet_count = 0
939         while time.time() < send_final_after:
940             self.test_session.send_packet()
941             p = wait_for_bfd_packet(self)
942             self.assert_equal(len(self.vapi.collect_events()), 0,
943                               "number of bfd events")
944             self.assert_equal(p[BFD].required_min_rx_interval,
945                               self.vpp_session.required_min_rx,
946                               "BFD required min rx interval")
947             packet_count += 1
948             # poll bit must be set
949             self.assertIn("P", p.sprintf("%BFD.flags%"),
950                           "Poll bit not set in BFD packet")
951         final = self.test_session.create_packet()
952         final[BFD].flags = "F"
953         self.test_session.send_packet(final)
954         # finish 1st with final
955         poll_sequence_length = time.time() - poll_sequence_start
956         # vpp must wait for some time before starting new poll sequence
957         poll_no_2_started = False
958         for dummy in range(2 * packet_count):
959             p = wait_for_bfd_packet(self)
960             self.assert_equal(len(self.vapi.collect_events()), 0,
961                               "number of bfd events")
962             if "P" in p.sprintf("%BFD.flags%"):
963                 poll_no_2_started = True
964                 if time.time() < poll_sequence_start + poll_sequence_length:
965                     raise Exception("VPP started 2nd poll sequence too soon")
966                 final = self.test_session.create_packet()
967                 final[BFD].flags = "F"
968                 self.test_session.send_packet(final)
969                 break
970             else:
971                 self.test_session.send_packet()
972         self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
973         # finish 2nd with final
974         final = self.test_session.create_packet()
975         final[BFD].flags = "F"
976         self.test_session.send_packet(final)
977         p = wait_for_bfd_packet(self)
978         # poll bit must not be set
979         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
980                          "Poll bit set in BFD packet")
981
982     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
983     def test_poll_response(self):
984         """ test correct response to control frame with poll bit set """
985         bfd_session_up(self)
986         poll = self.test_session.create_packet()
987         poll[BFD].flags = "P"
988         self.test_session.send_packet(poll)
989         final = wait_for_bfd_packet(
990             self, pcap_time_min=time.time() - self.vpp_clock_offset)
991         self.assertIn("F", final.sprintf("%BFD.flags%"))
992
993     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
994     def test_no_periodic_if_remote_demand(self):
995         """ no periodic frames outside poll sequence if remote demand set """
996         bfd_session_up(self)
997         demand = self.test_session.create_packet()
998         demand[BFD].flags = "D"
999         self.test_session.send_packet(demand)
1000         transmit_time = 0.9 \
1001             * max(self.vpp_session.required_min_rx,
1002                   self.test_session.desired_min_tx) \
1003             / USEC_IN_SEC
1004         count = 0
1005         for dummy in range(self.test_session.detect_mult * 2):
1006             time.sleep(transmit_time)
1007             self.test_session.send_packet(demand)
1008             try:
1009                 p = wait_for_bfd_packet(self, timeout=0)
1010                 self.logger.error(ppp("Received unexpected packet:", p))
1011                 count += 1
1012             except CaptureTimeoutError:
1013                 pass
1014         events = self.vapi.collect_events()
1015         for e in events:
1016             self.logger.error("Received unexpected event: %s", e)
1017         self.assert_equal(count, 0, "number of packets received")
1018         self.assert_equal(len(events), 0, "number of events received")
1019
1020     def test_echo_looped_back(self):
1021         """ echo packets looped back """
1022         # don't need a session in this case..
1023         self.vpp_session.remove_vpp_config()
1024         self.pg0.enable_capture()
1025         echo_packet_count = 10
1026         # random source port low enough to increment a few times..
1027         udp_sport_tx = randint(1, 50000)
1028         udp_sport_rx = udp_sport_tx
1029         echo_packet = (Ether(src=self.pg0.remote_mac,
1030                              dst=self.pg0.local_mac) /
1031                        IP(src=self.pg0.remote_ip4,
1032                           dst=self.pg0.remote_ip4) /
1033                        UDP(dport=BFD.udp_dport_echo) /
1034                        Raw("this should be looped back"))
1035         for dummy in range(echo_packet_count):
1036             self.sleep(.01, "delay between echo packets")
1037             echo_packet[UDP].sport = udp_sport_tx
1038             udp_sport_tx += 1
1039             self.logger.debug(ppp("Sending packet:", echo_packet))
1040             self.pg0.add_stream(echo_packet)
1041             self.pg_start()
1042         for dummy in range(echo_packet_count):
1043             p = self.pg0.wait_for_packet(1)
1044             self.logger.debug(ppp("Got packet:", p))
1045             ether = p[Ether]
1046             self.assert_equal(self.pg0.remote_mac,
1047                               ether.dst, "Destination MAC")
1048             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1049             ip = p[IP]
1050             self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1051             self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1052             udp = p[UDP]
1053             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1054                               "UDP destination port")
1055             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1056             udp_sport_rx += 1
1057             # need to compare the hex payload here, otherwise BFD_vpp_echo
1058             # gets in way
1059             self.assertEqual(str(p[UDP].payload),
1060                              str(echo_packet[UDP].payload),
1061                              "Received packet is not the echo packet sent")
1062         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1063                           "ECHO packet identifier for test purposes)")
1064
1065     def test_echo(self):
1066         """ echo function """
1067         bfd_session_up(self)
1068         self.test_session.update(required_min_echo_rx=50000)
1069         self.test_session.send_packet()
1070         detection_time = self.test_session.detect_mult *\
1071             self.vpp_session.required_min_rx / USEC_IN_SEC
1072         # echo shouldn't work without echo source set
1073         for dummy in range(3):
1074             sleep = 0.75 * detection_time
1075             self.sleep(sleep, "delay before sending bfd packet")
1076             self.test_session.send_packet()
1077         p = wait_for_bfd_packet(
1078             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1079         self.assert_equal(p[BFD].required_min_rx_interval,
1080                           self.vpp_session.required_min_rx,
1081                           "BFD required min rx interval")
1082         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1083         # should be turned on - loopback echo packets
1084         for dummy in range(3):
1085             loop_until = time.time() + 0.75 * detection_time
1086             while time.time() < loop_until:
1087                 p = self.pg0.wait_for_packet(1)
1088                 self.logger.debug(ppp("Got packet:", p))
1089                 if p[UDP].dport == BFD.udp_dport_echo:
1090                     self.assert_equal(
1091                         p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1092                     self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1093                                         "BFD ECHO src IP equal to loopback IP")
1094                     self.logger.debug(ppp("Looping back packet:", p))
1095                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1096                                       "ECHO packet destination MAC address")
1097                     p[Ether].dst = self.pg0.local_mac
1098                     self.pg0.add_stream(p)
1099                     self.pg_start()
1100                 elif p.haslayer(BFD):
1101                     self.assertGreaterEqual(p[BFD].required_min_rx_interval,
1102                                             1000000)
1103                     if "P" in p.sprintf("%BFD.flags%"):
1104                         final = self.test_session.create_packet()
1105                         final[BFD].flags = "F"
1106                         self.test_session.send_packet(final)
1107                 else:
1108                     raise Exception(ppp("Received unknown packet:", p))
1109
1110                 self.assert_equal(len(self.vapi.collect_events()), 0,
1111                                   "number of bfd events")
1112             self.test_session.send_packet()
1113
1114     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1115     def test_echo_fail(self):
1116         """ session goes down if echo function fails """
1117         bfd_session_up(self)
1118         self.test_session.update(required_min_echo_rx=50000)
1119         self.test_session.send_packet()
1120         detection_time = self.test_session.detect_mult *\
1121             self.vpp_session.required_min_rx / USEC_IN_SEC
1122         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1123         # echo function should be used now, but we will drop the echo packets
1124         verified_diag = False
1125         for dummy in range(3):
1126             loop_until = time.time() + 0.75 * detection_time
1127             while time.time() < loop_until:
1128                 p = self.pg0.wait_for_packet(1)
1129                 self.logger.debug(ppp("Got packet:", p))
1130                 if p[UDP].dport == BFD.udp_dport_echo:
1131                     # dropped
1132                     pass
1133                 elif p.haslayer(BFD):
1134                     if "P" in p.sprintf("%BFD.flags%"):
1135                         self.assertGreaterEqual(
1136                             p[BFD].required_min_rx_interval,
1137                             1000000)
1138                         final = self.test_session.create_packet()
1139                         final[BFD].flags = "F"
1140                         self.test_session.send_packet(final)
1141                     if p[BFD].state == BFDState.down:
1142                         self.assert_equal(p[BFD].diag,
1143                                           BFDDiagCode.echo_function_failed,
1144                                           BFDDiagCode)
1145                         verified_diag = True
1146                 else:
1147                     raise Exception(ppp("Received unknown packet:", p))
1148             self.test_session.send_packet()
1149         events = self.vapi.collect_events()
1150         self.assert_equal(len(events), 1, "number of bfd events")
1151         self.assert_equal(events[0].state, BFDState.down, BFDState)
1152         self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1153
1154     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1155     def test_echo_stop(self):
1156         """ echo function stops if peer sets required min echo rx zero """
1157         bfd_session_up(self)
1158         self.test_session.update(required_min_echo_rx=50000)
1159         self.test_session.send_packet()
1160         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1161         # wait for first echo packet
1162         while True:
1163             p = self.pg0.wait_for_packet(1)
1164             self.logger.debug(ppp("Got packet:", p))
1165             if p[UDP].dport == BFD.udp_dport_echo:
1166                 self.logger.debug(ppp("Looping back packet:", p))
1167                 p[Ether].dst = self.pg0.local_mac
1168                 self.pg0.add_stream(p)
1169                 self.pg_start()
1170                 break
1171             elif p.haslayer(BFD):
1172                 # ignore BFD
1173                 pass
1174             else:
1175                 raise Exception(ppp("Received unknown packet:", p))
1176         self.test_session.update(required_min_echo_rx=0)
1177         self.test_session.send_packet()
1178         # echo packets shouldn't arrive anymore
1179         for dummy in range(5):
1180             wait_for_bfd_packet(
1181                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1182             self.test_session.send_packet()
1183             events = self.vapi.collect_events()
1184             self.assert_equal(len(events), 0, "number of bfd events")
1185
1186     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1187     def test_echo_source_removed(self):
1188         """ echo function stops if echo source is removed """
1189         bfd_session_up(self)
1190         self.test_session.update(required_min_echo_rx=50000)
1191         self.test_session.send_packet()
1192         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1193         # wait for first echo packet
1194         while True:
1195             p = self.pg0.wait_for_packet(1)
1196             self.logger.debug(ppp("Got packet:", p))
1197             if p[UDP].dport == BFD.udp_dport_echo:
1198                 self.logger.debug(ppp("Looping back packet:", p))
1199                 p[Ether].dst = self.pg0.local_mac
1200                 self.pg0.add_stream(p)
1201                 self.pg_start()
1202                 break
1203             elif p.haslayer(BFD):
1204                 # ignore BFD
1205                 pass
1206             else:
1207                 raise Exception(ppp("Received unknown packet:", p))
1208         self.vapi.bfd_udp_del_echo_source()
1209         self.test_session.send_packet()
1210         # echo packets shouldn't arrive anymore
1211         for dummy in range(5):
1212             wait_for_bfd_packet(
1213                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1214             self.test_session.send_packet()
1215             events = self.vapi.collect_events()
1216             self.assert_equal(len(events), 0, "number of bfd events")
1217
1218     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1219     def test_stale_echo(self):
1220         """ stale echo packets don't keep a session up """
1221         bfd_session_up(self)
1222         self.test_session.update(required_min_echo_rx=50000)
1223         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1224         self.test_session.send_packet()
1225         # should be turned on - loopback echo packets
1226         echo_packet = None
1227         timeout_at = None
1228         timeout_ok = False
1229         for dummy in range(10 * self.vpp_session.detect_mult):
1230             p = self.pg0.wait_for_packet(1)
1231             if p[UDP].dport == BFD.udp_dport_echo:
1232                 if echo_packet is None:
1233                     self.logger.debug(ppp("Got first echo packet:", p))
1234                     echo_packet = p
1235                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1236                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1237                 else:
1238                     self.logger.debug(ppp("Got followup echo packet:", p))
1239                 self.logger.debug(ppp("Looping back first echo packet:", p))
1240                 echo_packet[Ether].dst = self.pg0.local_mac
1241                 self.pg0.add_stream(echo_packet)
1242                 self.pg_start()
1243             elif p.haslayer(BFD):
1244                 self.logger.debug(ppp("Got packet:", p))
1245                 if "P" in p.sprintf("%BFD.flags%"):
1246                     final = self.test_session.create_packet()
1247                     final[BFD].flags = "F"
1248                     self.test_session.send_packet(final)
1249                 if p[BFD].state == BFDState.down:
1250                     self.assertIsNotNone(
1251                         timeout_at,
1252                         "Session went down before first echo packet received")
1253                     now = time.time()
1254                     self.assertGreaterEqual(
1255                         now, timeout_at,
1256                         "Session timeout at %s, but is expected at %s" %
1257                         (now, timeout_at))
1258                     self.assert_equal(p[BFD].diag,
1259                                       BFDDiagCode.echo_function_failed,
1260                                       BFDDiagCode)
1261                     events = self.vapi.collect_events()
1262                     self.assert_equal(len(events), 1, "number of bfd events")
1263                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1264                     timeout_ok = True
1265                     break
1266             else:
1267                 raise Exception(ppp("Received unknown packet:", p))
1268             self.test_session.send_packet()
1269         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1270
1271     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1272     def test_invalid_echo_checksum(self):
1273         """ echo packets with invalid checksum don't keep a session up """
1274         bfd_session_up(self)
1275         self.test_session.update(required_min_echo_rx=50000)
1276         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1277         self.test_session.send_packet()
1278         # should be turned on - loopback echo packets
1279         timeout_at = None
1280         timeout_ok = False
1281         for dummy in range(10 * self.vpp_session.detect_mult):
1282             p = self.pg0.wait_for_packet(1)
1283             if p[UDP].dport == BFD.udp_dport_echo:
1284                 self.logger.debug(ppp("Got echo packet:", p))
1285                 if timeout_at is None:
1286                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1287                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1288                 p[BFD_vpp_echo].checksum = getrandbits(64)
1289                 p[Ether].dst = self.pg0.local_mac
1290                 self.logger.debug(ppp("Looping back modified echo packet:", p))
1291                 self.pg0.add_stream(p)
1292                 self.pg_start()
1293             elif p.haslayer(BFD):
1294                 self.logger.debug(ppp("Got packet:", p))
1295                 if "P" in p.sprintf("%BFD.flags%"):
1296                     final = self.test_session.create_packet()
1297                     final[BFD].flags = "F"
1298                     self.test_session.send_packet(final)
1299                 if p[BFD].state == BFDState.down:
1300                     self.assertIsNotNone(
1301                         timeout_at,
1302                         "Session went down before first echo packet received")
1303                     now = time.time()
1304                     self.assertGreaterEqual(
1305                         now, timeout_at,
1306                         "Session timeout at %s, but is expected at %s" %
1307                         (now, timeout_at))
1308                     self.assert_equal(p[BFD].diag,
1309                                       BFDDiagCode.echo_function_failed,
1310                                       BFDDiagCode)
1311                     events = self.vapi.collect_events()
1312                     self.assert_equal(len(events), 1, "number of bfd events")
1313                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1314                     timeout_ok = True
1315                     break
1316             else:
1317                 raise Exception(ppp("Received unknown packet:", p))
1318             self.test_session.send_packet()
1319         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1320
1321     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1322     def test_admin_up_down(self):
1323         """ put session admin-up and admin-down """
1324         bfd_session_up(self)
1325         self.vpp_session.admin_down()
1326         self.pg0.enable_capture()
1327         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1328         verify_event(self, e, expected_state=BFDState.admin_down)
1329         for dummy in range(2):
1330             p = wait_for_bfd_packet(self)
1331             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1332         # try to bring session up - shouldn't be possible
1333         self.test_session.update(state=BFDState.init)
1334         self.test_session.send_packet()
1335         for dummy in range(2):
1336             p = wait_for_bfd_packet(self)
1337             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1338         self.vpp_session.admin_up()
1339         self.test_session.update(state=BFDState.down)
1340         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1341         verify_event(self, e, expected_state=BFDState.down)
1342         p = wait_for_bfd_packet(
1343             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1344         self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1345         self.test_session.send_packet()
1346         p = wait_for_bfd_packet(
1347             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1348         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1349         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1350         verify_event(self, e, expected_state=BFDState.init)
1351         self.test_session.update(state=BFDState.up)
1352         self.test_session.send_packet()
1353         p = wait_for_bfd_packet(
1354             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1355         self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1356         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1357         verify_event(self, e, expected_state=BFDState.up)
1358
1359     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1360     def test_config_change_remote_demand(self):
1361         """ configuration change while peer in demand mode """
1362         bfd_session_up(self)
1363         demand = self.test_session.create_packet()
1364         demand[BFD].flags = "D"
1365         self.test_session.send_packet(demand)
1366         self.vpp_session.modify_parameters(
1367             required_min_rx=2 * self.vpp_session.required_min_rx)
1368         p = wait_for_bfd_packet(
1369             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1370         # poll bit must be set
1371         self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1372         # terminate poll sequence
1373         final = self.test_session.create_packet()
1374         final[BFD].flags = "D+F"
1375         self.test_session.send_packet(final)
1376         # vpp should be quiet now again
1377         transmit_time = 0.9 \
1378             * max(self.vpp_session.required_min_rx,
1379                   self.test_session.desired_min_tx) \
1380             / USEC_IN_SEC
1381         count = 0
1382         for dummy in range(self.test_session.detect_mult * 2):
1383             time.sleep(transmit_time)
1384             self.test_session.send_packet(demand)
1385             try:
1386                 p = wait_for_bfd_packet(self, timeout=0)
1387                 self.logger.error(ppp("Received unexpected packet:", p))
1388                 count += 1
1389             except CaptureTimeoutError:
1390                 pass
1391         events = self.vapi.collect_events()
1392         for e in events:
1393             self.logger.error("Received unexpected event: %s", e)
1394         self.assert_equal(count, 0, "number of packets received")
1395         self.assert_equal(len(events), 0, "number of events received")
1396
1397
1398 class BFD6TestCase(VppTestCase):
1399     """Bidirectional Forwarding Detection (BFD) (IPv6) """
1400
1401     pg0 = None
1402     vpp_clock_offset = None
1403     vpp_session = None
1404     test_session = None
1405
1406     @classmethod
1407     def setUpClass(cls):
1408         super(BFD6TestCase, cls).setUpClass()
1409         try:
1410             cls.create_pg_interfaces([0])
1411             cls.pg0.config_ip6()
1412             cls.pg0.configure_ipv6_neighbors()
1413             cls.pg0.admin_up()
1414             cls.pg0.resolve_ndp()
1415             cls.create_loopback_interfaces([0])
1416             cls.loopback0 = cls.lo_interfaces[0]
1417             cls.loopback0.config_ip6()
1418             cls.loopback0.admin_up()
1419
1420         except Exception:
1421             super(BFD6TestCase, cls).tearDownClass()
1422             raise
1423
1424     def setUp(self):
1425         super(BFD6TestCase, self).setUp()
1426         self.factory = AuthKeyFactory()
1427         self.vapi.want_bfd_events()
1428         self.pg0.enable_capture()
1429         try:
1430             self.vpp_session = VppBFDUDPSession(self, self.pg0,
1431                                                 self.pg0.remote_ip6,
1432                                                 af=AF_INET6)
1433             self.vpp_session.add_vpp_config()
1434             self.vpp_session.admin_up()
1435             self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1436             self.logger.debug(self.vapi.cli("show adj nbr"))
1437         except:
1438             self.vapi.want_bfd_events(enable_disable=0)
1439             raise
1440
1441     def tearDown(self):
1442         if not self.vpp_dead:
1443             self.vapi.want_bfd_events(enable_disable=0)
1444         self.vapi.collect_events()  # clear the event queue
1445         super(BFD6TestCase, self).tearDown()
1446
1447     def test_session_up(self):
1448         """ bring BFD session up """
1449         bfd_session_up(self)
1450
1451     def test_session_up_by_ip(self):
1452         """ bring BFD session up - first frame looked up by address pair """
1453         self.logger.info("BFD: Sending Slow control frame")
1454         self.test_session.update(my_discriminator=randint(0, 40000000))
1455         self.test_session.send_packet()
1456         self.pg0.enable_capture()
1457         p = self.pg0.wait_for_packet(1)
1458         self.assert_equal(p[BFD].your_discriminator,
1459                           self.test_session.my_discriminator,
1460                           "BFD - your discriminator")
1461         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1462         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1463                                  state=BFDState.up)
1464         self.logger.info("BFD: Waiting for event")
1465         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1466         verify_event(self, e, expected_state=BFDState.init)
1467         self.logger.info("BFD: Sending Up")
1468         self.test_session.send_packet()
1469         self.logger.info("BFD: Waiting for event")
1470         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1471         verify_event(self, e, expected_state=BFDState.up)
1472         self.logger.info("BFD: Session is Up")
1473         self.test_session.update(state=BFDState.up)
1474         self.test_session.send_packet()
1475         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1476
1477     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1478     def test_hold_up(self):
1479         """ hold BFD session up """
1480         bfd_session_up(self)
1481         for dummy in range(self.test_session.detect_mult * 2):
1482             wait_for_bfd_packet(self)
1483             self.test_session.send_packet()
1484         self.assert_equal(len(self.vapi.collect_events()), 0,
1485                           "number of bfd events")
1486         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1487
1488     def test_echo_looped_back(self):
1489         """ echo packets looped back """
1490         # don't need a session in this case..
1491         self.vpp_session.remove_vpp_config()
1492         self.pg0.enable_capture()
1493         echo_packet_count = 10
1494         # random source port low enough to increment a few times..
1495         udp_sport_tx = randint(1, 50000)
1496         udp_sport_rx = udp_sport_tx
1497         echo_packet = (Ether(src=self.pg0.remote_mac,
1498                              dst=self.pg0.local_mac) /
1499                        IPv6(src=self.pg0.remote_ip6,
1500                             dst=self.pg0.remote_ip6) /
1501                        UDP(dport=BFD.udp_dport_echo) /
1502                        Raw("this should be looped back"))
1503         for dummy in range(echo_packet_count):
1504             self.sleep(.01, "delay between echo packets")
1505             echo_packet[UDP].sport = udp_sport_tx
1506             udp_sport_tx += 1
1507             self.logger.debug(ppp("Sending packet:", echo_packet))
1508             self.pg0.add_stream(echo_packet)
1509             self.pg_start()
1510         for dummy in range(echo_packet_count):
1511             p = self.pg0.wait_for_packet(1)
1512             self.logger.debug(ppp("Got packet:", p))
1513             ether = p[Ether]
1514             self.assert_equal(self.pg0.remote_mac,
1515                               ether.dst, "Destination MAC")
1516             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1517             ip = p[IPv6]
1518             self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1519             self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1520             udp = p[UDP]
1521             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1522                               "UDP destination port")
1523             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1524             udp_sport_rx += 1
1525             # need to compare the hex payload here, otherwise BFD_vpp_echo
1526             # gets in way
1527             self.assertEqual(str(p[UDP].payload),
1528                              str(echo_packet[UDP].payload),
1529                              "Received packet is not the echo packet sent")
1530         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1531                           "ECHO packet identifier for test purposes)")
1532         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1533                           "ECHO packet identifier for test purposes)")
1534
1535     def test_echo(self):
1536         """ echo function used """
1537         bfd_session_up(self)
1538         self.test_session.update(required_min_echo_rx=50000)
1539         self.test_session.send_packet()
1540         detection_time = self.test_session.detect_mult *\
1541             self.vpp_session.required_min_rx / USEC_IN_SEC
1542         # echo shouldn't work without echo source set
1543         for dummy in range(3):
1544             sleep = 0.75 * detection_time
1545             self.sleep(sleep, "delay before sending bfd packet")
1546             self.test_session.send_packet()
1547         p = wait_for_bfd_packet(
1548             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1549         self.assert_equal(p[BFD].required_min_rx_interval,
1550                           self.vpp_session.required_min_rx,
1551                           "BFD required min rx interval")
1552         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1553         # should be turned on - loopback echo packets
1554         for dummy in range(3):
1555             loop_until = time.time() + 0.75 * detection_time
1556             while time.time() < loop_until:
1557                 p = self.pg0.wait_for_packet(1)
1558                 self.logger.debug(ppp("Got packet:", p))
1559                 if p[UDP].dport == BFD.udp_dport_echo:
1560                     self.assert_equal(
1561                         p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1562                     self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1563                                         "BFD ECHO src IP equal to loopback IP")
1564                     self.logger.debug(ppp("Looping back packet:", p))
1565                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1566                                       "ECHO packet destination MAC address")
1567                     p[Ether].dst = self.pg0.local_mac
1568                     self.pg0.add_stream(p)
1569                     self.pg_start()
1570                 elif p.haslayer(BFD):
1571                     self.assertGreaterEqual(p[BFD].required_min_rx_interval,
1572                                             1000000)
1573                     if "P" in p.sprintf("%BFD.flags%"):
1574                         final = self.test_session.create_packet()
1575                         final[BFD].flags = "F"
1576                         self.test_session.send_packet(final)
1577                 else:
1578                     raise Exception(ppp("Received unknown packet:", p))
1579
1580                 self.assert_equal(len(self.vapi.collect_events()), 0,
1581                                   "number of bfd events")
1582             self.test_session.send_packet()
1583
1584
1585 class BFDSHA1TestCase(VppTestCase):
1586     """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1587
1588     pg0 = None
1589     vpp_clock_offset = None
1590     vpp_session = None
1591     test_session = None
1592
1593     @classmethod
1594     def setUpClass(cls):
1595         super(BFDSHA1TestCase, cls).setUpClass()
1596         try:
1597             cls.create_pg_interfaces([0])
1598             cls.pg0.config_ip4()
1599             cls.pg0.admin_up()
1600             cls.pg0.resolve_arp()
1601
1602         except Exception:
1603             super(BFDSHA1TestCase, cls).tearDownClass()
1604             raise
1605
1606     def setUp(self):
1607         super(BFDSHA1TestCase, self).setUp()
1608         self.factory = AuthKeyFactory()
1609         self.vapi.want_bfd_events()
1610         self.pg0.enable_capture()
1611
1612     def tearDown(self):
1613         if not self.vpp_dead:
1614             self.vapi.want_bfd_events(enable_disable=0)
1615         self.vapi.collect_events()  # clear the event queue
1616         super(BFDSHA1TestCase, self).tearDown()
1617
1618     def test_session_up(self):
1619         """ bring BFD session up """
1620         key = self.factory.create_random_key(self)
1621         key.add_vpp_config()
1622         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1623                                             self.pg0.remote_ip4,
1624                                             sha1_key=key)
1625         self.vpp_session.add_vpp_config()
1626         self.vpp_session.admin_up()
1627         self.test_session = BFDTestSession(
1628             self, self.pg0, AF_INET, sha1_key=key,
1629             bfd_key_id=self.vpp_session.bfd_key_id)
1630         bfd_session_up(self)
1631
1632     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1633     def test_hold_up(self):
1634         """ hold BFD session up """
1635         key = self.factory.create_random_key(self)
1636         key.add_vpp_config()
1637         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1638                                             self.pg0.remote_ip4,
1639                                             sha1_key=key)
1640         self.vpp_session.add_vpp_config()
1641         self.vpp_session.admin_up()
1642         self.test_session = BFDTestSession(
1643             self, self.pg0, AF_INET, sha1_key=key,
1644             bfd_key_id=self.vpp_session.bfd_key_id)
1645         bfd_session_up(self)
1646         for dummy in range(self.test_session.detect_mult * 2):
1647             wait_for_bfd_packet(self)
1648             self.test_session.send_packet()
1649         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1650
1651     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1652     def test_hold_up_meticulous(self):
1653         """ hold BFD session up - meticulous auth """
1654         key = self.factory.create_random_key(
1655             self, BFDAuthType.meticulous_keyed_sha1)
1656         key.add_vpp_config()
1657         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1658                                             self.pg0.remote_ip4, sha1_key=key)
1659         self.vpp_session.add_vpp_config()
1660         self.vpp_session.admin_up()
1661         # specify sequence number so that it wraps
1662         self.test_session = BFDTestSession(
1663             self, self.pg0, AF_INET, sha1_key=key,
1664             bfd_key_id=self.vpp_session.bfd_key_id,
1665             our_seq_number=0xFFFFFFFF - 4)
1666         bfd_session_up(self)
1667         for dummy in range(30):
1668             wait_for_bfd_packet(self)
1669             self.test_session.inc_seq_num()
1670             self.test_session.send_packet()
1671         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1672
1673     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1674     def test_send_bad_seq_number(self):
1675         """ session is not kept alive by msgs with bad sequence numbers"""
1676         key = self.factory.create_random_key(
1677             self, BFDAuthType.meticulous_keyed_sha1)
1678         key.add_vpp_config()
1679         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1680                                             self.pg0.remote_ip4, sha1_key=key)
1681         self.vpp_session.add_vpp_config()
1682         self.test_session = BFDTestSession(
1683             self, self.pg0, AF_INET, sha1_key=key,
1684             bfd_key_id=self.vpp_session.bfd_key_id)
1685         bfd_session_up(self)
1686         detection_time = self.test_session.detect_mult *\
1687             self.vpp_session.required_min_rx / USEC_IN_SEC
1688         send_until = time.time() + 2 * detection_time
1689         while time.time() < send_until:
1690             self.test_session.send_packet()
1691             self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
1692                        "time between bfd packets")
1693         e = self.vapi.collect_events()
1694         # session should be down now, because the sequence numbers weren't
1695         # updated
1696         self.assert_equal(len(e), 1, "number of bfd events")
1697         verify_event(self, e[0], expected_state=BFDState.down)
1698
1699     def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
1700                                        legitimate_test_session,
1701                                        rogue_test_session,
1702                                        rogue_bfd_values=None):
1703         """ execute a rogue session interaction scenario
1704
1705         1. create vpp session, add config
1706         2. bring the legitimate session up
1707         3. copy the bfd values from legitimate session to rogue session
1708         4. apply rogue_bfd_values to rogue session
1709         5. set rogue session state to down
1710         6. send message to take the session down from the rogue session
1711         7. assert that the legitimate session is unaffected
1712         """
1713
1714         self.vpp_session = vpp_bfd_udp_session
1715         self.vpp_session.add_vpp_config()
1716         self.test_session = legitimate_test_session
1717         # bring vpp session up
1718         bfd_session_up(self)
1719         # send packet from rogue session
1720         rogue_test_session.update(
1721             my_discriminator=self.test_session.my_discriminator,
1722             your_discriminator=self.test_session.your_discriminator,
1723             desired_min_tx=self.test_session.desired_min_tx,
1724             required_min_rx=self.test_session.required_min_rx,
1725             detect_mult=self.test_session.detect_mult,
1726             diag=self.test_session.diag,
1727             state=self.test_session.state,
1728             auth_type=self.test_session.auth_type)
1729         if rogue_bfd_values:
1730             rogue_test_session.update(**rogue_bfd_values)
1731         rogue_test_session.update(state=BFDState.down)
1732         rogue_test_session.send_packet()
1733         wait_for_bfd_packet(self)
1734         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1735
1736     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1737     def test_mismatch_auth(self):
1738         """ session is not brought down by unauthenticated msg """
1739         key = self.factory.create_random_key(self)
1740         key.add_vpp_config()
1741         vpp_session = VppBFDUDPSession(
1742             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1743         legitimate_test_session = BFDTestSession(
1744             self, self.pg0, AF_INET, sha1_key=key,
1745             bfd_key_id=vpp_session.bfd_key_id)
1746         rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
1747         self.execute_rogue_session_scenario(vpp_session,
1748                                             legitimate_test_session,
1749                                             rogue_test_session)
1750
1751     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1752     def test_mismatch_bfd_key_id(self):
1753         """ session is not brought down by msg with non-existent key-id """
1754         key = self.factory.create_random_key(self)
1755         key.add_vpp_config()
1756         vpp_session = VppBFDUDPSession(
1757             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1758         # pick a different random bfd key id
1759         x = randint(0, 255)
1760         while x == vpp_session.bfd_key_id:
1761             x = randint(0, 255)
1762         legitimate_test_session = BFDTestSession(
1763             self, self.pg0, AF_INET, sha1_key=key,
1764             bfd_key_id=vpp_session.bfd_key_id)
1765         rogue_test_session = BFDTestSession(
1766             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
1767         self.execute_rogue_session_scenario(vpp_session,
1768                                             legitimate_test_session,
1769                                             rogue_test_session)
1770
1771     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1772     def test_mismatched_auth_type(self):
1773         """ session is not brought down by msg with wrong auth type """
1774         key = self.factory.create_random_key(self)
1775         key.add_vpp_config()
1776         vpp_session = VppBFDUDPSession(
1777             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1778         legitimate_test_session = BFDTestSession(
1779             self, self.pg0, AF_INET, sha1_key=key,
1780             bfd_key_id=vpp_session.bfd_key_id)
1781         rogue_test_session = BFDTestSession(
1782             self, self.pg0, AF_INET, sha1_key=key,
1783             bfd_key_id=vpp_session.bfd_key_id)
1784         self.execute_rogue_session_scenario(
1785             vpp_session, legitimate_test_session, rogue_test_session,
1786             {'auth_type': BFDAuthType.keyed_md5})
1787
1788     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1789     def test_restart(self):
1790         """ simulate remote peer restart and resynchronization """
1791         key = self.factory.create_random_key(
1792             self, BFDAuthType.meticulous_keyed_sha1)
1793         key.add_vpp_config()
1794         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1795                                             self.pg0.remote_ip4, sha1_key=key)
1796         self.vpp_session.add_vpp_config()
1797         self.test_session = BFDTestSession(
1798             self, self.pg0, AF_INET, sha1_key=key,
1799             bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
1800         bfd_session_up(self)
1801         # don't send any packets for 2*detection_time
1802         detection_time = self.test_session.detect_mult *\
1803             self.vpp_session.required_min_rx / USEC_IN_SEC
1804         self.sleep(2 * detection_time, "simulating peer restart")
1805         events = self.vapi.collect_events()
1806         self.assert_equal(len(events), 1, "number of bfd events")
1807         verify_event(self, events[0], expected_state=BFDState.down)
1808         self.test_session.update(state=BFDState.down)
1809         # reset sequence number
1810         self.test_session.our_seq_number = 0
1811         self.test_session.vpp_seq_number = None
1812         # now throw away any pending packets
1813         self.pg0.enable_capture()
1814         bfd_session_up(self)
1815
1816
1817 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1818 class BFDAuthOnOffTestCase(VppTestCase):
1819     """Bidirectional Forwarding Detection (BFD) (changing auth) """
1820
1821     pg0 = None
1822     vpp_session = None
1823     test_session = None
1824
1825     @classmethod
1826     def setUpClass(cls):
1827         super(BFDAuthOnOffTestCase, cls).setUpClass()
1828         try:
1829             cls.create_pg_interfaces([0])
1830             cls.pg0.config_ip4()
1831             cls.pg0.admin_up()
1832             cls.pg0.resolve_arp()
1833
1834         except Exception:
1835             super(BFDAuthOnOffTestCase, cls).tearDownClass()
1836             raise
1837
1838     def setUp(self):
1839         super(BFDAuthOnOffTestCase, self).setUp()
1840         self.factory = AuthKeyFactory()
1841         self.vapi.want_bfd_events()
1842         self.pg0.enable_capture()
1843
1844     def tearDown(self):
1845         if not self.vpp_dead:
1846             self.vapi.want_bfd_events(enable_disable=0)
1847         self.vapi.collect_events()  # clear the event queue
1848         super(BFDAuthOnOffTestCase, self).tearDown()
1849
1850     def test_auth_on_immediate(self):
1851         """ turn auth on without disturbing session state (immediate) """
1852         key = self.factory.create_random_key(self)
1853         key.add_vpp_config()
1854         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1855                                             self.pg0.remote_ip4)
1856         self.vpp_session.add_vpp_config()
1857         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
1858         bfd_session_up(self)
1859         for dummy in range(self.test_session.detect_mult * 2):
1860             p = wait_for_bfd_packet(self)
1861             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1862             self.test_session.send_packet()
1863         self.vpp_session.activate_auth(key)
1864         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1865         self.test_session.sha1_key = key
1866         for dummy in range(self.test_session.detect_mult * 2):
1867             p = wait_for_bfd_packet(self)
1868             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1869             self.test_session.send_packet()
1870         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1871         self.assert_equal(len(self.vapi.collect_events()), 0,
1872                           "number of bfd events")
1873
1874     def test_auth_off_immediate(self):
1875         """ turn auth off without disturbing session state (immediate) """
1876         key = self.factory.create_random_key(self)
1877         key.add_vpp_config()
1878         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1879                                             self.pg0.remote_ip4, sha1_key=key)
1880         self.vpp_session.add_vpp_config()
1881         self.test_session = BFDTestSession(
1882             self, self.pg0, AF_INET, sha1_key=key,
1883             bfd_key_id=self.vpp_session.bfd_key_id)
1884         bfd_session_up(self)
1885         # self.vapi.want_bfd_events(enable_disable=0)
1886         for dummy in range(self.test_session.detect_mult * 2):
1887             p = wait_for_bfd_packet(self)
1888             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1889             self.test_session.inc_seq_num()
1890             self.test_session.send_packet()
1891         self.vpp_session.deactivate_auth()
1892         self.test_session.bfd_key_id = None
1893         self.test_session.sha1_key = None
1894         for dummy in range(self.test_session.detect_mult * 2):
1895             p = wait_for_bfd_packet(self)
1896             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1897             self.test_session.inc_seq_num()
1898             self.test_session.send_packet()
1899         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1900         self.assert_equal(len(self.vapi.collect_events()), 0,
1901                           "number of bfd events")
1902
1903     def test_auth_change_key_immediate(self):
1904         """ change auth key without disturbing session state (immediate) """
1905         key1 = self.factory.create_random_key(self)
1906         key1.add_vpp_config()
1907         key2 = self.factory.create_random_key(self)
1908         key2.add_vpp_config()
1909         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1910                                             self.pg0.remote_ip4, sha1_key=key1)
1911         self.vpp_session.add_vpp_config()
1912         self.test_session = BFDTestSession(
1913             self, self.pg0, AF_INET, sha1_key=key1,
1914             bfd_key_id=self.vpp_session.bfd_key_id)
1915         bfd_session_up(self)
1916         for dummy in range(self.test_session.detect_mult * 2):
1917             p = wait_for_bfd_packet(self)
1918             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1919             self.test_session.send_packet()
1920         self.vpp_session.activate_auth(key2)
1921         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1922         self.test_session.sha1_key = key2
1923         for dummy in range(self.test_session.detect_mult * 2):
1924             p = wait_for_bfd_packet(self)
1925             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1926             self.test_session.send_packet()
1927         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1928         self.assert_equal(len(self.vapi.collect_events()), 0,
1929                           "number of bfd events")
1930
1931     def test_auth_on_delayed(self):
1932         """ turn auth on without disturbing session state (delayed) """
1933         key = self.factory.create_random_key(self)
1934         key.add_vpp_config()
1935         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1936                                             self.pg0.remote_ip4)
1937         self.vpp_session.add_vpp_config()
1938         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
1939         bfd_session_up(self)
1940         for dummy in range(self.test_session.detect_mult * 2):
1941             wait_for_bfd_packet(self)
1942             self.test_session.send_packet()
1943         self.vpp_session.activate_auth(key, delayed=True)
1944         for dummy in range(self.test_session.detect_mult * 2):
1945             p = wait_for_bfd_packet(self)
1946             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1947             self.test_session.send_packet()
1948         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1949         self.test_session.sha1_key = key
1950         self.test_session.send_packet()
1951         for dummy in range(self.test_session.detect_mult * 2):
1952             p = wait_for_bfd_packet(self)
1953             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1954             self.test_session.send_packet()
1955         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1956         self.assert_equal(len(self.vapi.collect_events()), 0,
1957                           "number of bfd events")
1958
1959     def test_auth_off_delayed(self):
1960         """ turn auth off without disturbing session state (delayed) """
1961         key = self.factory.create_random_key(self)
1962         key.add_vpp_config()
1963         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1964                                             self.pg0.remote_ip4, sha1_key=key)
1965         self.vpp_session.add_vpp_config()
1966         self.test_session = BFDTestSession(
1967             self, self.pg0, AF_INET, sha1_key=key,
1968             bfd_key_id=self.vpp_session.bfd_key_id)
1969         bfd_session_up(self)
1970         for dummy in range(self.test_session.detect_mult * 2):
1971             p = wait_for_bfd_packet(self)
1972             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1973             self.test_session.send_packet()
1974         self.vpp_session.deactivate_auth(delayed=True)
1975         for dummy in range(self.test_session.detect_mult * 2):
1976             p = wait_for_bfd_packet(self)
1977             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1978             self.test_session.send_packet()
1979         self.test_session.bfd_key_id = None
1980         self.test_session.sha1_key = None
1981         self.test_session.send_packet()
1982         for dummy in range(self.test_session.detect_mult * 2):
1983             p = wait_for_bfd_packet(self)
1984             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1985             self.test_session.send_packet()
1986         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1987         self.assert_equal(len(self.vapi.collect_events()), 0,
1988                           "number of bfd events")
1989
1990     def test_auth_change_key_delayed(self):
1991         """ change auth key without disturbing session state (delayed) """
1992         key1 = self.factory.create_random_key(self)
1993         key1.add_vpp_config()
1994         key2 = self.factory.create_random_key(self)
1995         key2.add_vpp_config()
1996         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1997                                             self.pg0.remote_ip4, sha1_key=key1)
1998         self.vpp_session.add_vpp_config()
1999         self.vpp_session.admin_up()
2000         self.test_session = BFDTestSession(
2001             self, self.pg0, AF_INET, sha1_key=key1,
2002             bfd_key_id=self.vpp_session.bfd_key_id)
2003         bfd_session_up(self)
2004         for dummy in range(self.test_session.detect_mult * 2):
2005             p = wait_for_bfd_packet(self)
2006             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2007             self.test_session.send_packet()
2008         self.vpp_session.activate_auth(key2, delayed=True)
2009         for dummy in range(self.test_session.detect_mult * 2):
2010             p = wait_for_bfd_packet(self)
2011             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2012             self.test_session.send_packet()
2013         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2014         self.test_session.sha1_key = key2
2015         self.test_session.send_packet()
2016         for dummy in range(self.test_session.detect_mult * 2):
2017             p = wait_for_bfd_packet(self)
2018             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2019             self.test_session.send_packet()
2020         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2021         self.assert_equal(len(self.vapi.collect_events()), 0,
2022                           "number of bfd events")
2023
2024
2025 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2026 class BFDCLITestCase(VppTestCase):
2027     """Bidirectional Forwarding Detection (BFD) (CLI) """
2028     pg0 = None
2029
2030     @classmethod
2031     def setUpClass(cls):
2032         super(BFDCLITestCase, cls).setUpClass()
2033
2034         try:
2035             cls.create_pg_interfaces((0,))
2036             cls.pg0.config_ip4()
2037             cls.pg0.config_ip6()
2038             cls.pg0.resolve_arp()
2039             cls.pg0.resolve_ndp()
2040
2041         except Exception:
2042             super(BFDCLITestCase, cls).tearDownClass()
2043             raise
2044
2045     def setUp(self):
2046         super(BFDCLITestCase, self).setUp()
2047         self.factory = AuthKeyFactory()
2048         self.pg0.enable_capture()
2049
2050     def tearDown(self):
2051         try:
2052             self.vapi.want_bfd_events(enable_disable=0)
2053         except UnexpectedApiReturnValueError:
2054             # some tests aren't subscribed, so this is not an issue
2055             pass
2056         self.vapi.collect_events()  # clear the event queue
2057         super(BFDCLITestCase, self).tearDown()
2058
2059     def cli_verify_no_response(self, cli):
2060         """ execute a CLI, asserting that the response is empty """
2061         self.assert_equal(self.vapi.cli(cli),
2062                           "",
2063                           "CLI command response")
2064
2065     def cli_verify_response(self, cli, expected):
2066         """ execute a CLI, asserting that the response matches expectation """
2067         self.assert_equal(self.vapi.cli(cli).strip(),
2068                           expected,
2069                           "CLI command response")
2070
2071     def test_show(self):
2072         """ show commands """
2073         k1 = self.factory.create_random_key(self)
2074         k1.add_vpp_config()
2075         k2 = self.factory.create_random_key(
2076             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2077         k2.add_vpp_config()
2078         s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2079         s1.add_vpp_config()
2080         s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2081                               sha1_key=k2)
2082         s2.add_vpp_config()
2083         self.logger.info(self.vapi.ppcli("show bfd keys"))
2084         self.logger.info(self.vapi.ppcli("show bfd sessions"))
2085         self.logger.info(self.vapi.ppcli("show bfd"))
2086
2087     def test_set_del_sha1_key(self):
2088         """ set/delete SHA1 auth key """
2089         k = self.factory.create_random_key(self)
2090         self.registry.register(k, self.logger)
2091         self.cli_verify_no_response(
2092             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2093             (k.conf_key_id,
2094                 "".join("{:02x}".format(ord(c)) for c in k.key)))
2095         self.assertTrue(k.query_vpp_config())
2096         self.vpp_session = VppBFDUDPSession(
2097             self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2098         self.vpp_session.add_vpp_config()
2099         self.test_session = \
2100             BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2101                            bfd_key_id=self.vpp_session.bfd_key_id)
2102         self.vapi.want_bfd_events()
2103         bfd_session_up(self)
2104         bfd_session_down(self)
2105         # try to replace the secret for the key - should fail because the key
2106         # is in-use
2107         k2 = self.factory.create_random_key(self)
2108         self.cli_verify_response(
2109             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2110             (k.conf_key_id,
2111                 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2112             "bfd key set: `bfd_auth_set_key' API call failed, "
2113             "rv=-103:BFD object in use")
2114         # manipulating the session using old secret should still work
2115         bfd_session_up(self)
2116         bfd_session_down(self)
2117         self.vpp_session.remove_vpp_config()
2118         self.cli_verify_no_response(
2119             "bfd key del conf-key-id %s" % k.conf_key_id)
2120         self.assertFalse(k.query_vpp_config())
2121
2122     def test_set_del_meticulous_sha1_key(self):
2123         """ set/delete meticulous SHA1 auth key """
2124         k = self.factory.create_random_key(
2125             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2126         self.registry.register(k, self.logger)
2127         self.cli_verify_no_response(
2128             "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2129             (k.conf_key_id,
2130                 "".join("{:02x}".format(ord(c)) for c in k.key)))
2131         self.assertTrue(k.query_vpp_config())
2132         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2133                                             self.pg0.remote_ip6, af=AF_INET6,
2134                                             sha1_key=k)
2135         self.vpp_session.add_vpp_config()
2136         self.vpp_session.admin_up()
2137         self.test_session = \
2138             BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2139                            bfd_key_id=self.vpp_session.bfd_key_id)
2140         self.vapi.want_bfd_events()
2141         bfd_session_up(self)
2142         bfd_session_down(self)
2143         # try to replace the secret for the key - should fail because the key
2144         # is in-use
2145         k2 = self.factory.create_random_key(self)
2146         self.cli_verify_response(
2147             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2148             (k.conf_key_id,
2149                 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2150             "bfd key set: `bfd_auth_set_key' API call failed, "
2151             "rv=-103:BFD object in use")
2152         # manipulating the session using old secret should still work
2153         bfd_session_up(self)
2154         bfd_session_down(self)
2155         self.vpp_session.remove_vpp_config()
2156         self.cli_verify_no_response(
2157             "bfd key del conf-key-id %s" % k.conf_key_id)
2158         self.assertFalse(k.query_vpp_config())
2159
2160     def test_add_mod_del_bfd_udp(self):
2161         """ create/modify/delete IPv4 BFD UDP session """
2162         vpp_session = VppBFDUDPSession(
2163             self, self.pg0, self.pg0.remote_ip4)
2164         self.registry.register(vpp_session, self.logger)
2165         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2166             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2167             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2168                                 self.pg0.remote_ip4,
2169                                 vpp_session.desired_min_tx,
2170                                 vpp_session.required_min_rx,
2171                                 vpp_session.detect_mult)
2172         self.cli_verify_no_response(cli_add_cmd)
2173         # 2nd add should fail
2174         self.cli_verify_response(
2175             cli_add_cmd,
2176             "bfd udp session add: `bfd_add_add_session' API call"
2177             " failed, rv=-101:Duplicate BFD object")
2178         verify_bfd_session_config(self, vpp_session)
2179         mod_session = VppBFDUDPSession(
2180             self, self.pg0, self.pg0.remote_ip4,
2181             required_min_rx=2 * vpp_session.required_min_rx,
2182             desired_min_tx=3 * vpp_session.desired_min_tx,
2183             detect_mult=4 * vpp_session.detect_mult)
2184         self.cli_verify_no_response(
2185             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2186             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2187             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2188              mod_session.desired_min_tx, mod_session.required_min_rx,
2189              mod_session.detect_mult))
2190         verify_bfd_session_config(self, mod_session)
2191         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2192             "peer-addr %s" % (self.pg0.name,
2193                               self.pg0.local_ip4, self.pg0.remote_ip4)
2194         self.cli_verify_no_response(cli_del_cmd)
2195         # 2nd del is expected to fail
2196         self.cli_verify_response(
2197             cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2198             " failed, rv=-102:No such BFD object")
2199         self.assertFalse(vpp_session.query_vpp_config())
2200
2201     def test_add_mod_del_bfd_udp6(self):
2202         """ create/modify/delete IPv6 BFD UDP session """
2203         vpp_session = VppBFDUDPSession(
2204             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2205         self.registry.register(vpp_session, self.logger)
2206         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2207             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2208             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2209                                 self.pg0.remote_ip6,
2210                                 vpp_session.desired_min_tx,
2211                                 vpp_session.required_min_rx,
2212                                 vpp_session.detect_mult)
2213         self.cli_verify_no_response(cli_add_cmd)
2214         # 2nd add should fail
2215         self.cli_verify_response(
2216             cli_add_cmd,
2217             "bfd udp session add: `bfd_add_add_session' API call"
2218             " failed, rv=-101:Duplicate BFD object")
2219         verify_bfd_session_config(self, vpp_session)
2220         mod_session = VppBFDUDPSession(
2221             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2222             required_min_rx=2 * vpp_session.required_min_rx,
2223             desired_min_tx=3 * vpp_session.desired_min_tx,
2224             detect_mult=4 * vpp_session.detect_mult)
2225         self.cli_verify_no_response(
2226             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2227             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2228             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2229              mod_session.desired_min_tx,
2230              mod_session.required_min_rx, mod_session.detect_mult))
2231         verify_bfd_session_config(self, mod_session)
2232         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2233             "peer-addr %s" % (self.pg0.name,
2234                               self.pg0.local_ip6, self.pg0.remote_ip6)
2235         self.cli_verify_no_response(cli_del_cmd)
2236         # 2nd del is expected to fail
2237         self.cli_verify_response(
2238             cli_del_cmd,
2239             "bfd udp session del: `bfd_udp_del_session' API call"
2240             " failed, rv=-102:No such BFD object")
2241         self.assertFalse(vpp_session.query_vpp_config())
2242
2243     def test_add_mod_del_bfd_udp_auth(self):
2244         """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2245         key = self.factory.create_random_key(self)
2246         key.add_vpp_config()
2247         vpp_session = VppBFDUDPSession(
2248             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2249         self.registry.register(vpp_session, self.logger)
2250         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2251             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2252             "detect-mult %s conf-key-id %s bfd-key-id %s"\
2253             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2254                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2255                vpp_session.detect_mult, key.conf_key_id,
2256                vpp_session.bfd_key_id)
2257         self.cli_verify_no_response(cli_add_cmd)
2258         # 2nd add should fail
2259         self.cli_verify_response(
2260             cli_add_cmd,
2261             "bfd udp session add: `bfd_add_add_session' API call"
2262             " failed, rv=-101:Duplicate BFD object")
2263         verify_bfd_session_config(self, vpp_session)
2264         mod_session = VppBFDUDPSession(
2265             self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2266             bfd_key_id=vpp_session.bfd_key_id,
2267             required_min_rx=2 * vpp_session.required_min_rx,
2268             desired_min_tx=3 * vpp_session.desired_min_tx,
2269             detect_mult=4 * vpp_session.detect_mult)
2270         self.cli_verify_no_response(
2271             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2272             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2273             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2274              mod_session.desired_min_tx,
2275              mod_session.required_min_rx, mod_session.detect_mult))
2276         verify_bfd_session_config(self, mod_session)
2277         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2278             "peer-addr %s" % (self.pg0.name,
2279                               self.pg0.local_ip4, self.pg0.remote_ip4)
2280         self.cli_verify_no_response(cli_del_cmd)
2281         # 2nd del is expected to fail
2282         self.cli_verify_response(
2283             cli_del_cmd,
2284             "bfd udp session del: `bfd_udp_del_session' API call"
2285             " failed, rv=-102:No such BFD object")
2286         self.assertFalse(vpp_session.query_vpp_config())
2287
2288     def test_add_mod_del_bfd_udp6_auth(self):
2289         """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2290         key = self.factory.create_random_key(
2291             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2292         key.add_vpp_config()
2293         vpp_session = VppBFDUDPSession(
2294             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2295         self.registry.register(vpp_session, self.logger)
2296         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2297             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2298             "detect-mult %s conf-key-id %s bfd-key-id %s" \
2299             % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2300                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2301                vpp_session.detect_mult, key.conf_key_id,
2302                vpp_session.bfd_key_id)
2303         self.cli_verify_no_response(cli_add_cmd)
2304         # 2nd add should fail
2305         self.cli_verify_response(
2306             cli_add_cmd,
2307             "bfd udp session add: `bfd_add_add_session' API call"
2308             " failed, rv=-101:Duplicate BFD object")
2309         verify_bfd_session_config(self, vpp_session)
2310         mod_session = VppBFDUDPSession(
2311             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2312             bfd_key_id=vpp_session.bfd_key_id,
2313             required_min_rx=2 * vpp_session.required_min_rx,
2314             desired_min_tx=3 * vpp_session.desired_min_tx,
2315             detect_mult=4 * vpp_session.detect_mult)
2316         self.cli_verify_no_response(
2317             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2318             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2319             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2320              mod_session.desired_min_tx,
2321              mod_session.required_min_rx, mod_session.detect_mult))
2322         verify_bfd_session_config(self, mod_session)
2323         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2324             "peer-addr %s" % (self.pg0.name,
2325                               self.pg0.local_ip6, self.pg0.remote_ip6)
2326         self.cli_verify_no_response(cli_del_cmd)
2327         # 2nd del is expected to fail
2328         self.cli_verify_response(
2329             cli_del_cmd,
2330             "bfd udp session del: `bfd_udp_del_session' API call"
2331             " failed, rv=-102:No such BFD object")
2332         self.assertFalse(vpp_session.query_vpp_config())
2333
2334     def test_auth_on_off(self):
2335         """ turn authentication on and off """
2336         key = self.factory.create_random_key(
2337             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2338         key.add_vpp_config()
2339         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2340         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2341                                         sha1_key=key)
2342         session.add_vpp_config()
2343         cli_activate = \
2344             "bfd udp session auth activate interface %s local-addr %s "\
2345             "peer-addr %s conf-key-id %s bfd-key-id %s"\
2346             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2347                key.conf_key_id, auth_session.bfd_key_id)
2348         self.cli_verify_no_response(cli_activate)
2349         verify_bfd_session_config(self, auth_session)
2350         self.cli_verify_no_response(cli_activate)
2351         verify_bfd_session_config(self, auth_session)
2352         cli_deactivate = \
2353             "bfd udp session auth deactivate interface %s local-addr %s "\
2354             "peer-addr %s "\
2355             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2356         self.cli_verify_no_response(cli_deactivate)
2357         verify_bfd_session_config(self, session)
2358         self.cli_verify_no_response(cli_deactivate)
2359         verify_bfd_session_config(self, session)
2360
2361     def test_auth_on_off_delayed(self):
2362         """ turn authentication on and off (delayed) """
2363         key = self.factory.create_random_key(
2364             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2365         key.add_vpp_config()
2366         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2367         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2368                                         sha1_key=key)
2369         session.add_vpp_config()
2370         cli_activate = \
2371             "bfd udp session auth activate interface %s local-addr %s "\
2372             "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2373             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2374                key.conf_key_id, auth_session.bfd_key_id)
2375         self.cli_verify_no_response(cli_activate)
2376         verify_bfd_session_config(self, auth_session)
2377         self.cli_verify_no_response(cli_activate)
2378         verify_bfd_session_config(self, auth_session)
2379         cli_deactivate = \
2380             "bfd udp session auth deactivate interface %s local-addr %s "\
2381             "peer-addr %s delayed yes"\
2382             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2383         self.cli_verify_no_response(cli_deactivate)
2384         verify_bfd_session_config(self, session)
2385         self.cli_verify_no_response(cli_deactivate)
2386         verify_bfd_session_config(self, session)
2387
2388     def test_admin_up_down(self):
2389         """ put session admin-up and admin-down """
2390         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2391         session.add_vpp_config()
2392         cli_down = \
2393             "bfd udp session set-flags admin down interface %s local-addr %s "\
2394             "peer-addr %s "\
2395             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2396         cli_up = \
2397             "bfd udp session set-flags admin up interface %s local-addr %s "\
2398             "peer-addr %s "\
2399             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2400         self.cli_verify_no_response(cli_down)
2401         verify_bfd_session_config(self, session, state=BFDState.admin_down)
2402         self.cli_verify_no_response(cli_up)
2403         verify_bfd_session_config(self, session, state=BFDState.down)
2404
2405     def test_set_del_udp_echo_source(self):
2406         """ set/del udp echo source """
2407         self.create_loopback_interfaces([0])
2408         self.loopback0 = self.lo_interfaces[0]
2409         self.loopback0.admin_up()
2410         self.cli_verify_response("show bfd echo-source",
2411                                  "UDP echo source is not set.")
2412         cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2413         self.cli_verify_no_response(cli_set)
2414         self.cli_verify_response("show bfd echo-source",
2415                                  "UDP echo source is: %s\n"
2416                                  "IPv4 address usable as echo source: none\n"
2417                                  "IPv6 address usable as echo source: none" %
2418                                  self.loopback0.name)
2419         self.loopback0.config_ip4()
2420         unpacked = unpack("!L", self.loopback0.local_ip4n)
2421         echo_ip4 = inet_ntop(AF_INET, pack("!L", unpacked[0] ^ 1))
2422         self.cli_verify_response("show bfd echo-source",
2423                                  "UDP echo source is: %s\n"
2424                                  "IPv4 address usable as echo source: %s\n"
2425                                  "IPv6 address usable as echo source: none" %
2426                                  (self.loopback0.name, echo_ip4))
2427         unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
2428         echo_ip6 = inet_ntop(AF_INET6, pack("!LLLL", unpacked[0], unpacked[1],
2429                                             unpacked[2], unpacked[3] ^ 1))
2430         self.loopback0.config_ip6()
2431         self.cli_verify_response("show bfd echo-source",
2432                                  "UDP echo source is: %s\n"
2433                                  "IPv4 address usable as echo source: %s\n"
2434                                  "IPv6 address usable as echo source: %s" %
2435                                  (self.loopback0.name, echo_ip4, echo_ip6))
2436         cli_del = "bfd udp echo-source del"
2437         self.cli_verify_no_response(cli_del)
2438         self.cli_verify_response("show bfd echo-source",
2439                                  "UDP echo source is not set.")
2440
2441 if __name__ == '__main__':
2442     unittest.main(testRunner=VppTestRunner)