VPP-540 : pbb tag rewrite details
[vpp.git] / test / test_bfd.py
1 #!/usr/bin/env python
2 """ BFD tests """
3
4 from __future__ import division
5 import unittest
6 import hashlib
7 import binascii
8 import time
9 from random import randint, shuffle
10 from socket import AF_INET, AF_INET6
11 from scapy.packet import Raw
12 from scapy.layers.l2 import Ether
13 from scapy.layers.inet import UDP, IP
14 from scapy.layers.inet6 import IPv6
15 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
16     BFDDiagCode, BFDState
17 from framework import VppTestCase, VppTestRunner
18 from vpp_pg_interface import CaptureTimeoutError
19 from util import ppp
20
21 USEC_IN_SEC = 1000000
22
23
24 class AuthKeyFactory(object):
25     """Factory class for creating auth keys with unique conf key ID"""
26
27     def __init__(self):
28         self._conf_key_ids = {}
29
30     def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
31         """ create a random key with unique conf key id """
32         conf_key_id = randint(0, 0xFFFFFFFF)
33         while conf_key_id in self._conf_key_ids:
34             conf_key_id = randint(0, 0xFFFFFFFF)
35         self._conf_key_ids[conf_key_id] = 1
36         key = str(bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
37         return VppBFDAuthKey(test=test, auth_type=auth_type,
38                              conf_key_id=conf_key_id, key=key)
39
40
41 class BFDAPITestCase(VppTestCase):
42     """Bidirectional Forwarding Detection (BFD) - API"""
43
44     pg0 = None
45     pg1 = None
46
47     @classmethod
48     def setUpClass(cls):
49         super(BFDAPITestCase, cls).setUpClass()
50
51         try:
52             cls.create_pg_interfaces(range(2))
53             for i in cls.pg_interfaces:
54                 i.config_ip4()
55                 i.config_ip6()
56                 i.resolve_arp()
57
58         except Exception:
59             super(BFDAPITestCase, cls).tearDownClass()
60             raise
61
62     def setUp(self):
63         super(BFDAPITestCase, self).setUp()
64         self.factory = AuthKeyFactory()
65
66     def test_add_bfd(self):
67         """ create a BFD session """
68         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
69         session.add_vpp_config()
70         self.logger.debug("Session state is %s", session.state)
71         session.remove_vpp_config()
72         session.add_vpp_config()
73         self.logger.debug("Session state is %s", session.state)
74         session.remove_vpp_config()
75
76     def test_double_add(self):
77         """ create the same BFD session twice (negative case) """
78         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
79         session.add_vpp_config()
80
81         with self.vapi.expect_negative_api_retval():
82             session.add_vpp_config()
83
84         session.remove_vpp_config()
85
86     def test_add_bfd6(self):
87         """ create IPv6 BFD session """
88         session = VppBFDUDPSession(
89             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
90         session.add_vpp_config()
91         self.logger.debug("Session state is %s", session.state)
92         session.remove_vpp_config()
93         session.add_vpp_config()
94         self.logger.debug("Session state is %s", session.state)
95         session.remove_vpp_config()
96
97     def test_mod_bfd(self):
98         """ modify BFD session parameters """
99         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
100                                    desired_min_tx=50000,
101                                    required_min_rx=10000,
102                                    detect_mult=1)
103         session.add_vpp_config()
104         s = session.get_bfd_udp_session_dump_entry()
105         self.assert_equal(session.desired_min_tx,
106                           s.desired_min_tx,
107                           "desired min transmit interval")
108         self.assert_equal(session.required_min_rx,
109                           s.required_min_rx,
110                           "required min receive interval")
111         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
112         session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
113                                   required_min_rx=session.required_min_rx * 2,
114                                   detect_mult=session.detect_mult * 2)
115         s = session.get_bfd_udp_session_dump_entry()
116         self.assert_equal(session.desired_min_tx,
117                           s.desired_min_tx,
118                           "desired min transmit interval")
119         self.assert_equal(session.required_min_rx,
120                           s.required_min_rx,
121                           "required min receive interval")
122         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
123
124     def test_add_sha1_keys(self):
125         """ add SHA1 keys """
126         key_count = 10
127         keys = [self.factory.create_random_key(
128             self) for i in range(0, key_count)]
129         for key in keys:
130             self.assertFalse(key.query_vpp_config())
131         for key in keys:
132             key.add_vpp_config()
133         for key in keys:
134             self.assertTrue(key.query_vpp_config())
135         # remove randomly
136         indexes = range(key_count)
137         shuffle(indexes)
138         removed = []
139         for i in indexes:
140             key = keys[i]
141             key.remove_vpp_config()
142             removed.append(i)
143             for j in range(key_count):
144                 key = keys[j]
145                 if j in removed:
146                     self.assertFalse(key.query_vpp_config())
147                 else:
148                     self.assertTrue(key.query_vpp_config())
149         # should be removed now
150         for key in keys:
151             self.assertFalse(key.query_vpp_config())
152         # add back and remove again
153         for key in keys:
154             key.add_vpp_config()
155         for key in keys:
156             self.assertTrue(key.query_vpp_config())
157         for key in keys:
158             key.remove_vpp_config()
159         for key in keys:
160             self.assertFalse(key.query_vpp_config())
161
162     def test_add_bfd_sha1(self):
163         """ create a BFD session (SHA1) """
164         key = self.factory.create_random_key(self)
165         key.add_vpp_config()
166         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
167                                    sha1_key=key)
168         session.add_vpp_config()
169         self.logger.debug("Session state is %s", session.state)
170         session.remove_vpp_config()
171         session.add_vpp_config()
172         self.logger.debug("Session state is %s", session.state)
173         session.remove_vpp_config()
174
175     def test_double_add_sha1(self):
176         """ create the same BFD session twice (negative case) (SHA1) """
177         key = self.factory.create_random_key(self)
178         key.add_vpp_config()
179         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
180                                    sha1_key=key)
181         session.add_vpp_config()
182         with self.assertRaises(Exception):
183             session.add_vpp_config()
184
185     def test_add_auth_nonexistent_key(self):
186         """ create BFD session using non-existent SHA1 (negative case) """
187         session = VppBFDUDPSession(
188             self, self.pg0, self.pg0.remote_ip4,
189             sha1_key=self.factory.create_random_key(self))
190         with self.assertRaises(Exception):
191             session.add_vpp_config()
192
193     def test_shared_sha1_key(self):
194         """ share single SHA1 key between multiple BFD sessions """
195         key = self.factory.create_random_key(self)
196         key.add_vpp_config()
197         sessions = [
198             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
199                              sha1_key=key),
200             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
201                              sha1_key=key, af=AF_INET6),
202             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
203                              sha1_key=key),
204             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
205                              sha1_key=key, af=AF_INET6)]
206         for s in sessions:
207             s.add_vpp_config()
208         removed = 0
209         for s in sessions:
210             e = key.get_bfd_auth_keys_dump_entry()
211             self.assert_equal(e.use_count, len(sessions) - removed,
212                               "Use count for shared key")
213             s.remove_vpp_config()
214             removed += 1
215         e = key.get_bfd_auth_keys_dump_entry()
216         self.assert_equal(e.use_count, len(sessions) - removed,
217                           "Use count for shared key")
218
219     def test_activate_auth(self):
220         """ activate SHA1 authentication """
221         key = self.factory.create_random_key(self)
222         key.add_vpp_config()
223         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
224         session.add_vpp_config()
225         session.activate_auth(key)
226
227     def test_deactivate_auth(self):
228         """ deactivate SHA1 authentication """
229         key = self.factory.create_random_key(self)
230         key.add_vpp_config()
231         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
232         session.add_vpp_config()
233         session.activate_auth(key)
234         session.deactivate_auth()
235
236     def test_change_key(self):
237         """ change SHA1 key """
238         key1 = self.factory.create_random_key(self)
239         key2 = self.factory.create_random_key(self)
240         while key2.conf_key_id == key1.conf_key_id:
241             key2 = self.factory.create_random_key(self)
242         key1.add_vpp_config()
243         key2.add_vpp_config()
244         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
245                                    sha1_key=key1)
246         session.add_vpp_config()
247         session.activate_auth(key2)
248
249
250 class BFDTestSession(object):
251     """ BFD session as seen from test framework side """
252
253     def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
254                  bfd_key_id=None, our_seq_number=None):
255         self.test = test
256         self.af = af
257         self.sha1_key = sha1_key
258         self.bfd_key_id = bfd_key_id
259         self.interface = interface
260         self.udp_sport = randint(49152, 65535)
261         if our_seq_number is None:
262             self.our_seq_number = randint(0, 40000000)
263         else:
264             self.our_seq_number = our_seq_number
265         self.vpp_seq_number = None
266         self.my_discriminator = 0
267         self.desired_min_tx = 100000
268         self.required_min_rx = 100000
269         self.detect_mult = detect_mult
270         self.diag = BFDDiagCode.no_diagnostic
271         self.your_discriminator = None
272         self.state = BFDState.down
273         self.auth_type = BFDAuthType.no_auth
274
275     def inc_seq_num(self):
276         """ increment sequence number, wrapping if needed """
277         if self.our_seq_number == 0xFFFFFFFF:
278             self.our_seq_number = 0
279         else:
280             self.our_seq_number += 1
281
282     def update(self, my_discriminator=None, your_discriminator=None,
283                desired_min_tx=None, required_min_rx=None, detect_mult=None,
284                diag=None, state=None, auth_type=None):
285         """ update BFD parameters associated with session """
286         if my_discriminator:
287             self.my_discriminator = my_discriminator
288         if your_discriminator:
289             self.your_discriminator = your_discriminator
290         if required_min_rx:
291             self.required_min_rx = required_min_rx
292         if desired_min_tx:
293             self.desired_min_tx = desired_min_tx
294         if detect_mult:
295             self.detect_mult = detect_mult
296         if diag:
297             self.diag = diag
298         if state:
299             self.state = state
300         if auth_type:
301             self.auth_type = auth_type
302
303     def fill_packet_fields(self, packet):
304         """ set packet fields with known values in packet """
305         bfd = packet[BFD]
306         if self.my_discriminator:
307             self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
308                                    self.my_discriminator)
309             bfd.my_discriminator = self.my_discriminator
310         if self.your_discriminator:
311             self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
312                                    self.your_discriminator)
313             bfd.your_discriminator = self.your_discriminator
314         if self.required_min_rx:
315             self.test.logger.debug(
316                 "BFD: setting packet.required_min_rx_interval=%s",
317                 self.required_min_rx)
318             bfd.required_min_rx_interval = self.required_min_rx
319         if self.desired_min_tx:
320             self.test.logger.debug(
321                 "BFD: setting packet.desired_min_tx_interval=%s",
322                 self.desired_min_tx)
323             bfd.desired_min_tx_interval = self.desired_min_tx
324         if self.detect_mult:
325             self.test.logger.debug(
326                 "BFD: setting packet.detect_mult=%s", self.detect_mult)
327             bfd.detect_mult = self.detect_mult
328         if self.diag:
329             self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
330             bfd.diag = self.diag
331         if self.state:
332             self.test.logger.debug("BFD: setting packet.state=%s", self.state)
333             bfd.state = self.state
334         if self.auth_type:
335             # this is used by a negative test-case
336             self.test.logger.debug("BFD: setting packet.auth_type=%s",
337                                    self.auth_type)
338             bfd.auth_type = self.auth_type
339
340     def create_packet(self):
341         """ create a BFD packet, reflecting the current state of session """
342         if self.sha1_key:
343             bfd = BFD(flags="A")
344             bfd.auth_type = self.sha1_key.auth_type
345             bfd.auth_len = BFD.sha1_auth_len
346             bfd.auth_key_id = self.bfd_key_id
347             bfd.auth_seq_num = self.our_seq_number
348             bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
349         else:
350             bfd = BFD()
351         if self.af == AF_INET6:
352             packet = (Ether(src=self.interface.remote_mac,
353                             dst=self.interface.local_mac) /
354                       IPv6(src=self.interface.remote_ip6,
355                            dst=self.interface.local_ip6,
356                            hlim=255) /
357                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
358                       bfd)
359         else:
360             packet = (Ether(src=self.interface.remote_mac,
361                             dst=self.interface.local_mac) /
362                       IP(src=self.interface.remote_ip4,
363                          dst=self.interface.local_ip4,
364                          ttl=255) /
365                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
366                       bfd)
367         self.test.logger.debug("BFD: Creating packet")
368         self.fill_packet_fields(packet)
369         if self.sha1_key:
370             hash_material = str(packet[BFD])[:32] + self.sha1_key.key + \
371                 "\0" * (20 - len(self.sha1_key.key))
372             self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
373                                    hashlib.sha1(hash_material).hexdigest())
374             packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
375         return packet
376
377     def send_packet(self, packet=None, interface=None):
378         """ send packet on interface, creating the packet if needed """
379         if packet is None:
380             packet = self.create_packet()
381         if interface is None:
382             interface = self.test.pg0
383         self.test.logger.debug(ppp("Sending packet:", packet))
384         interface.add_stream(packet)
385         self.test.pg_start()
386
387     def verify_sha1_auth(self, packet):
388         """ Verify correctness of authentication in BFD layer. """
389         bfd = packet[BFD]
390         self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
391         self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
392                                BFDAuthType)
393         self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
394         self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
395         if self.vpp_seq_number is None:
396             self.vpp_seq_number = bfd.auth_seq_num
397             self.test.logger.debug("Received initial sequence number: %s" %
398                                    self.vpp_seq_number)
399         else:
400             recvd_seq_num = bfd.auth_seq_num
401             self.test.logger.debug("Received followup sequence number: %s" %
402                                    recvd_seq_num)
403             if self.vpp_seq_number < 0xffffffff:
404                 if self.sha1_key.auth_type == \
405                         BFDAuthType.meticulous_keyed_sha1:
406                     self.test.assert_equal(recvd_seq_num,
407                                            self.vpp_seq_number + 1,
408                                            "BFD sequence number")
409                 else:
410                     self.test.assert_in_range(recvd_seq_num,
411                                               self.vpp_seq_number,
412                                               self.vpp_seq_number + 1,
413                                               "BFD sequence number")
414             else:
415                 if self.sha1_key.auth_type == \
416                         BFDAuthType.meticulous_keyed_sha1:
417                     self.test.assert_equal(recvd_seq_num, 0,
418                                            "BFD sequence number")
419                 else:
420                     self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
421                                        "BFD sequence number not one of "
422                                        "(%s, 0)" % self.vpp_seq_number)
423             self.vpp_seq_number = recvd_seq_num
424         # last 20 bytes represent the hash - so replace them with the key,
425         # pad the result with zeros and hash the result
426         hash_material = bfd.original[:-20] + self.sha1_key.key + \
427             "\0" * (20 - len(self.sha1_key.key))
428         expected_hash = hashlib.sha1(hash_material).hexdigest()
429         self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
430                                expected_hash, "Auth key hash")
431
432     def verify_bfd(self, packet):
433         """ Verify correctness of BFD layer. """
434         bfd = packet[BFD]
435         self.test.assert_equal(bfd.version, 1, "BFD version")
436         self.test.assert_equal(bfd.your_discriminator,
437                                self.my_discriminator,
438                                "BFD - your discriminator")
439         if self.sha1_key:
440             self.verify_sha1_auth(packet)
441
442
443 def bfd_session_up(test):
444     """ Bring BFD session up """
445     test.logger.info("BFD: Waiting for slow hello")
446     p = wait_for_bfd_packet(test, 2)
447     old_offset = None
448     if hasattr(test, 'vpp_clock_offset'):
449         old_offset = test.vpp_clock_offset
450     test.vpp_clock_offset = time.time() - p.time
451     test.logger.debug("BFD: Calculated vpp clock offset: %s",
452                       test.vpp_clock_offset)
453     if old_offset:
454         test.assertAlmostEqual(
455             old_offset, test.vpp_clock_offset, delta=0.1,
456             msg="vpp clock offset not stable (new: %s, old: %s)" %
457             (test.vpp_clock_offset, old_offset))
458     test.logger.info("BFD: Sending Init")
459     test.test_session.update(my_discriminator=randint(0, 40000000),
460                              your_discriminator=p[BFD].my_discriminator,
461                              state=BFDState.init)
462     test.test_session.send_packet()
463     test.logger.info("BFD: Waiting for event")
464     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
465     verify_event(test, e, expected_state=BFDState.up)
466     test.logger.info("BFD: Session is Up")
467     test.test_session.update(state=BFDState.up)
468     test.test_session.send_packet()
469     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
470
471
472 def bfd_session_down(test):
473     """ Bring BFD session down """
474     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
475     test.test_session.update(state=BFDState.down)
476     test.test_session.send_packet()
477     test.logger.info("BFD: Waiting for event")
478     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
479     verify_event(test, e, expected_state=BFDState.down)
480     test.logger.info("BFD: Session is Down")
481     test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
482
483
484 def verify_ip(test, packet):
485     """ Verify correctness of IP layer. """
486     if test.vpp_session.af == AF_INET6:
487         ip = packet[IPv6]
488         local_ip = test.pg0.local_ip6
489         remote_ip = test.pg0.remote_ip6
490         test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
491     else:
492         ip = packet[IP]
493         local_ip = test.pg0.local_ip4
494         remote_ip = test.pg0.remote_ip4
495         test.assert_equal(ip.ttl, 255, "IPv4 TTL")
496     test.assert_equal(ip.src, local_ip, "IP source address")
497     test.assert_equal(ip.dst, remote_ip, "IP destination address")
498
499
500 def verify_udp(test, packet):
501     """ Verify correctness of UDP layer. """
502     udp = packet[UDP]
503     test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
504     test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
505                          "UDP source port")
506
507
508 def verify_event(test, event, expected_state):
509     """ Verify correctness of event values. """
510     e = event
511     test.logger.debug("BFD: Event: %s" % repr(e))
512     test.assert_equal(e.sw_if_index,
513                       test.vpp_session.interface.sw_if_index,
514                       "BFD interface index")
515     is_ipv6 = 0
516     if test.vpp_session.af == AF_INET6:
517         is_ipv6 = 1
518     test.assert_equal(e.is_ipv6, is_ipv6, "is_ipv6")
519     if test.vpp_session.af == AF_INET:
520         test.assert_equal(e.local_addr[:4], test.vpp_session.local_addr_n,
521                           "Local IPv4 address")
522         test.assert_equal(e.peer_addr[:4], test.vpp_session.peer_addr_n,
523                           "Peer IPv4 address")
524     else:
525         test.assert_equal(e.local_addr, test.vpp_session.local_addr_n,
526                           "Local IPv6 address")
527         test.assert_equal(e.peer_addr, test.vpp_session.peer_addr_n,
528                           "Peer IPv6 address")
529     test.assert_equal(e.state, expected_state, BFDState)
530
531
532 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None):
533     """ wait for BFD packet and verify its correctness
534
535     :param timeout: how long to wait
536     :param pcap_time_min: ignore packets with pcap timestamp lower than this
537
538     :returns: tuple (packet, time spent waiting for packet)
539     """
540     test.logger.info("BFD: Waiting for BFD packet")
541     deadline = time.time() + timeout
542     counter = 0
543     while True:
544         counter += 1
545         # sanity check
546         test.assert_in_range(counter, 0, 100, "number of packets ignored")
547         time_left = deadline - time.time()
548         if time_left < 0:
549             raise CaptureTimeoutError("Packet did not arrive within timeout")
550         p = test.pg0.wait_for_packet(timeout=time_left)
551         test.logger.debug(ppp("BFD: Got packet:", p))
552         if pcap_time_min is not None and p.time < pcap_time_min:
553             test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
554                                   "pcap time min %s):" %
555                                   (p.time, pcap_time_min), p))
556         else:
557             break
558     bfd = p[BFD]
559     if bfd is None:
560         raise Exception(ppp("Unexpected or invalid BFD packet:", p))
561     if bfd.payload:
562         raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
563     verify_ip(test, p)
564     verify_udp(test, p)
565     test.test_session.verify_bfd(p)
566     return p
567
568
569 class BFD4TestCase(VppTestCase):
570     """Bidirectional Forwarding Detection (BFD)"""
571
572     pg0 = None
573     vpp_clock_offset = None
574     vpp_session = None
575     test_session = None
576
577     @classmethod
578     def setUpClass(cls):
579         super(BFD4TestCase, cls).setUpClass()
580         try:
581             cls.create_pg_interfaces([0])
582             cls.pg0.config_ip4()
583             cls.pg0.configure_ipv4_neighbors()
584             cls.pg0.admin_up()
585             cls.pg0.resolve_arp()
586
587         except Exception:
588             super(BFD4TestCase, cls).tearDownClass()
589             raise
590
591     def setUp(self):
592         super(BFD4TestCase, self).setUp()
593         self.factory = AuthKeyFactory()
594         self.vapi.want_bfd_events()
595         self.pg0.enable_capture()
596         try:
597             self.vpp_session = VppBFDUDPSession(self, self.pg0,
598                                                 self.pg0.remote_ip4)
599             self.vpp_session.add_vpp_config()
600             self.vpp_session.admin_up()
601             self.test_session = BFDTestSession(self, self.pg0, AF_INET)
602         except:
603             self.vapi.want_bfd_events(enable_disable=0)
604             raise
605
606     def tearDown(self):
607         if not self.vpp_dead:
608             self.vapi.want_bfd_events(enable_disable=0)
609         self.vapi.collect_events()  # clear the event queue
610         super(BFD4TestCase, self).tearDown()
611
612     def test_session_up(self):
613         """ bring BFD session up """
614         bfd_session_up(self)
615
616     def test_session_down(self):
617         """ bring BFD session down """
618         bfd_session_up(self)
619         bfd_session_down(self)
620
621     def test_hold_up(self):
622         """ hold BFD session up """
623         bfd_session_up(self)
624         for dummy in range(self.test_session.detect_mult * 2):
625             wait_for_bfd_packet(self)
626             self.test_session.send_packet()
627         self.assert_equal(len(self.vapi.collect_events()), 0,
628                           "number of bfd events")
629
630     def test_slow_timer(self):
631         """ verify slow periodic control frames while session down """
632         packet_count = 3
633         self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
634         prev_packet = wait_for_bfd_packet(self, 2)
635         for dummy in range(packet_count):
636             next_packet = wait_for_bfd_packet(self, 2)
637             time_diff = next_packet.time - prev_packet.time
638             # spec says the range should be <0.75, 1>, allow extra 0.05 margin
639             # to work around timing issues
640             self.assert_in_range(
641                 time_diff, 0.70, 1.05, "time between slow packets")
642             prev_packet = next_packet
643
644     def test_zero_remote_min_rx(self):
645         """ no packets when zero remote required min rx interval """
646         bfd_session_up(self)
647         self.test_session.update(required_min_rx=0)
648         self.test_session.send_packet()
649         cap = 2 * self.vpp_session.desired_min_tx *\
650             self.test_session.detect_mult
651         time_mark = time.time()
652         count = 0
653         # busy wait here, trying to collect a packet or event, vpp is not
654         # allowed to send packets and the session will timeout first - so the
655         # Up->Down event must arrive before any packets do
656         while time.time() < time_mark + cap / USEC_IN_SEC:
657             try:
658                 p = wait_for_bfd_packet(
659                     self, timeout=0,
660                     pcap_time_min=time_mark - self.vpp_clock_offset)
661                 self.logger.error(ppp("Received unexpected packet:", p))
662                 count += 1
663             except CaptureTimeoutError:
664                 pass
665             events = self.vapi.collect_events()
666             if len(events) > 0:
667                 verify_event(self, events[0], BFDState.down)
668                 break
669         self.assert_equal(count, 0, "number of packets received")
670
671     def test_conn_down(self):
672         """ verify session goes down after inactivity """
673         bfd_session_up(self)
674         detection_time = self.vpp_session.detect_mult *\
675             self.vpp_session.required_min_rx / USEC_IN_SEC
676         self.sleep(detection_time, "waiting for BFD session time-out")
677         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
678         verify_event(self, e, expected_state=BFDState.down)
679
680     def test_large_required_min_rx(self):
681         """ large remote required min rx interval """
682         bfd_session_up(self)
683         p = wait_for_bfd_packet(self)
684         interval = 3000000
685         self.test_session.update(required_min_rx=interval)
686         self.test_session.send_packet()
687         time_mark = time.time()
688         count = 0
689         # busy wait here, trying to collect a packet or event, vpp is not
690         # allowed to send packets and the session will timeout first - so the
691         # Up->Down event must arrive before any packets do
692         while time.time() < time_mark + interval / USEC_IN_SEC:
693             try:
694                 p = wait_for_bfd_packet(self, timeout=0)
695                 # if vpp managed to send a packet before we did the session
696                 # session update, then that's fine, ignore it
697                 if p.time < time_mark - self.vpp_clock_offset:
698                     continue
699                 self.logger.error(ppp("Received unexpected packet:", p))
700                 count += 1
701             except CaptureTimeoutError:
702                 pass
703             events = self.vapi.collect_events()
704             if len(events) > 0:
705                 verify_event(self, events[0], BFDState.down)
706                 break
707         self.assert_equal(count, 0, "number of packets received")
708
709     def test_immediate_remote_min_rx_reduction(self):
710         """ immediately honor remote required min rx reduction """
711         self.vpp_session.remove_vpp_config()
712         self.vpp_session = VppBFDUDPSession(
713             self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
714         self.pg0.enable_capture()
715         self.vpp_session.add_vpp_config()
716         self.test_session.update(desired_min_tx=1000000,
717                                  required_min_rx=1000000)
718         bfd_session_up(self)
719         reference_packet = wait_for_bfd_packet(self)
720         time_mark = time.time()
721         interval = 300000
722         self.test_session.update(required_min_rx=interval)
723         self.test_session.send_packet()
724         extra_time = time.time() - time_mark
725         p = wait_for_bfd_packet(self)
726         # first packet is allowed to be late by time we spent doing the update
727         # calculated in extra_time
728         self.assert_in_range(p.time - reference_packet.time,
729                              .95 * 0.75 * interval / USEC_IN_SEC,
730                              1.05 * interval / USEC_IN_SEC + extra_time,
731                              "time between BFD packets")
732         reference_packet = p
733         for dummy in range(3):
734             p = wait_for_bfd_packet(self)
735             diff = p.time - reference_packet.time
736             self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
737                                  1.05 * interval / USEC_IN_SEC,
738                                  "time between BFD packets")
739             reference_packet = p
740
741     def test_modify_req_min_rx_double(self):
742         """ modify session - double required min rx """
743         bfd_session_up(self)
744         p = wait_for_bfd_packet(self)
745         self.test_session.update(desired_min_tx=10000,
746                                  required_min_rx=10000)
747         self.test_session.send_packet()
748         # double required min rx
749         self.vpp_session.modify_parameters(
750             required_min_rx=2 * self.vpp_session.required_min_rx)
751         p = wait_for_bfd_packet(
752             self, pcap_time_min=time.time() - self.vpp_clock_offset)
753         # poll bit needs to be set
754         self.assertIn("P", p.sprintf("%BFD.flags%"),
755                       "Poll bit not set in BFD packet")
756         # finish poll sequence with final packet
757         final = self.test_session.create_packet()
758         final[BFD].flags = "F"
759         timeout = self.test_session.detect_mult * \
760             max(self.test_session.desired_min_tx,
761                 self.vpp_session.required_min_rx) / USEC_IN_SEC
762         self.test_session.send_packet(final)
763         time_mark = time.time()
764         e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
765         verify_event(self, e, expected_state=BFDState.down)
766         time_to_event = time.time() - time_mark
767         self.assert_in_range(time_to_event, .9 * timeout,
768                              1.1 * timeout, "session timeout")
769
770     def test_modify_req_min_rx_halve(self):
771         """ modify session - halve required min rx """
772         self.vpp_session.modify_parameters(
773             required_min_rx=2 * self.vpp_session.required_min_rx)
774         bfd_session_up(self)
775         p = wait_for_bfd_packet(self)
776         self.test_session.update(desired_min_tx=10000,
777                                  required_min_rx=10000)
778         self.test_session.send_packet()
779         p = wait_for_bfd_packet(
780             self, pcap_time_min=time.time() - self.vpp_clock_offset)
781         # halve required min rx
782         old_required_min_rx = self.vpp_session.required_min_rx
783         self.vpp_session.modify_parameters(
784             required_min_rx=0.5 * self.vpp_session.required_min_rx)
785         # now we wait 0.8*3*old-req-min-rx and the session should still be up
786         self.sleep(0.8 * self.vpp_session.detect_mult *
787                    old_required_min_rx / USEC_IN_SEC)
788         self.assert_equal(len(self.vapi.collect_events()), 0,
789                           "number of bfd events")
790         p = wait_for_bfd_packet(self)
791         # poll bit needs to be set
792         self.assertIn("P", p.sprintf("%BFD.flags%"),
793                       "Poll bit not set in BFD packet")
794         # finish poll sequence with final packet
795         final = self.test_session.create_packet()
796         final[BFD].flags = "F"
797         self.test_session.send_packet(final)
798         # now the session should time out under new conditions
799         before = time.time()
800         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
801         after = time.time()
802         detection_time = self.vpp_session.detect_mult *\
803             self.vpp_session.required_min_rx / USEC_IN_SEC
804         self.assert_in_range(after - before,
805                              0.9 * detection_time,
806                              1.1 * detection_time,
807                              "time before bfd session goes down")
808         verify_event(self, e, expected_state=BFDState.down)
809
810     def test_modify_detect_mult(self):
811         """ modify detect multiplier """
812         bfd_session_up(self)
813         p = wait_for_bfd_packet(self)
814         self.vpp_session.modify_parameters(detect_mult=1)
815         p = wait_for_bfd_packet(
816             self, pcap_time_min=time.time() - self.vpp_clock_offset)
817         self.assert_equal(self.vpp_session.detect_mult,
818                           p[BFD].detect_mult,
819                           "detect mult")
820         # poll bit must not be set
821         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
822                          "Poll bit not set in BFD packet")
823         self.vpp_session.modify_parameters(detect_mult=10)
824         p = wait_for_bfd_packet(
825             self, pcap_time_min=time.time() - self.vpp_clock_offset)
826         self.assert_equal(self.vpp_session.detect_mult,
827                           p[BFD].detect_mult,
828                           "detect mult")
829         # poll bit must not be set
830         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
831                          "Poll bit not set in BFD packet")
832
833     def test_no_periodic_if_remote_demand(self):
834         """ no periodic frames outside poll sequence if remote demand set """
835         bfd_session_up(self)
836         demand = self.test_session.create_packet()
837         demand[BFD].flags = "D"
838         self.test_session.send_packet(demand)
839         transmit_time = 0.9 \
840             * max(self.vpp_session.required_min_rx,
841                   self.test_session.desired_min_tx) \
842             / USEC_IN_SEC
843         count = 0
844         for dummy in range(self.test_session.detect_mult * 2):
845             time.sleep(transmit_time)
846             self.test_session.send_packet(demand)
847             try:
848                 p = wait_for_bfd_packet(self, timeout=0)
849                 self.logger.error(ppp("Received unexpected packet:", p))
850                 count += 1
851             except CaptureTimeoutError:
852                 pass
853         events = self.vapi.collect_events()
854         for e in events:
855             self.logger.error("Received unexpected event: %s", e)
856         self.assert_equal(count, 0, "number of packets received")
857         self.assert_equal(len(events), 0, "number of events received")
858
859     def test_echo_looped_back(self):
860         """ echo packets looped back """
861         # don't need a session in this case..
862         self.vpp_session.remove_vpp_config()
863         self.pg0.enable_capture()
864         echo_packet_count = 10
865         # random source port low enough to increment a few times..
866         udp_sport_tx = randint(1, 50000)
867         udp_sport_rx = udp_sport_tx
868         echo_packet = (Ether(src=self.pg0.remote_mac,
869                              dst=self.pg0.local_mac) /
870                        IP(src=self.pg0.remote_ip4,
871                           dst=self.pg0.local_ip4) /
872                        UDP(dport=BFD.udp_dport_echo) /
873                        Raw("this should be looped back"))
874         for dummy in range(echo_packet_count):
875             self.sleep(.01, "delay between echo packets")
876             echo_packet[UDP].sport = udp_sport_tx
877             udp_sport_tx += 1
878             self.logger.debug(ppp("Sending packet:", echo_packet))
879             self.pg0.add_stream(echo_packet)
880             self.pg_start()
881         for dummy in range(echo_packet_count):
882             p = self.pg0.wait_for_packet(1)
883             self.logger.debug(ppp("Got packet:", p))
884             ether = p[Ether]
885             self.assert_equal(self.pg0.remote_mac,
886                               ether.dst, "Destination MAC")
887             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
888             ip = p[IP]
889             self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
890             self.assert_equal(self.pg0.local_ip4, ip.src, "Destination IP")
891             udp = p[UDP]
892             self.assert_equal(udp.dport, BFD.udp_dport_echo,
893                               "UDP destination port")
894             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
895             udp_sport_rx += 1
896             self.assertTrue(p.haslayer(Raw) and p[Raw] == echo_packet[Raw],
897                             "Received packet is not the echo packet sent")
898         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
899                           "ECHO packet identifier for test purposes)")
900
901     def test_admin_up_down(self):
902         bfd_session_up(self)
903         self.vpp_session.admin_down()
904         self.pg0.enable_capture()
905         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
906         verify_event(self, e, expected_state=BFDState.admin_down)
907         for dummy in range(2):
908             p = wait_for_bfd_packet(self)
909             self.assert_equal(BFDState.admin_down, p[BFD].state, BFDState)
910         # try to bring session up - shouldn't be possible
911         self.test_session.update(state=BFDState.init)
912         self.test_session.send_packet()
913         for dummy in range(2):
914             p = wait_for_bfd_packet(self)
915             self.assert_equal(BFDState.admin_down, p[BFD].state, BFDState)
916         self.vpp_session.admin_up()
917         self.test_session.update(state=BFDState.down)
918         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
919         verify_event(self, e, expected_state=BFDState.down)
920         p = wait_for_bfd_packet(self)
921         self.assert_equal(BFDState.down, p[BFD].state, BFDState)
922         self.test_session.send_packet()
923         p = wait_for_bfd_packet(self)
924         self.assert_equal(BFDState.init, p[BFD].state, BFDState)
925         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
926         verify_event(self, e, expected_state=BFDState.init)
927         self.test_session.update(state=BFDState.up)
928         self.test_session.send_packet()
929         p = wait_for_bfd_packet(self)
930         self.assert_equal(BFDState.up, p[BFD].state, BFDState)
931         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
932         verify_event(self, e, expected_state=BFDState.up)
933
934
935 class BFD6TestCase(VppTestCase):
936     """Bidirectional Forwarding Detection (BFD) (IPv6) """
937
938     pg0 = None
939     vpp_clock_offset = None
940     vpp_session = None
941     test_session = None
942
943     @classmethod
944     def setUpClass(cls):
945         super(BFD6TestCase, cls).setUpClass()
946         try:
947             cls.create_pg_interfaces([0])
948             cls.pg0.config_ip6()
949             cls.pg0.configure_ipv6_neighbors()
950             cls.pg0.admin_up()
951             cls.pg0.resolve_ndp()
952
953         except Exception:
954             super(BFD6TestCase, cls).tearDownClass()
955             raise
956
957     def setUp(self):
958         super(BFD6TestCase, self).setUp()
959         self.factory = AuthKeyFactory()
960         self.vapi.want_bfd_events()
961         self.pg0.enable_capture()
962         try:
963             self.vpp_session = VppBFDUDPSession(self, self.pg0,
964                                                 self.pg0.remote_ip6,
965                                                 af=AF_INET6)
966             self.vpp_session.add_vpp_config()
967             self.vpp_session.admin_up()
968             self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
969             self.logger.debug(self.vapi.cli("show adj nbr"))
970         except:
971             self.vapi.want_bfd_events(enable_disable=0)
972             raise
973
974     def tearDown(self):
975         if not self.vpp_dead:
976             self.vapi.want_bfd_events(enable_disable=0)
977         self.vapi.collect_events()  # clear the event queue
978         super(BFD6TestCase, self).tearDown()
979
980     def test_session_up(self):
981         """ bring BFD session up """
982         bfd_session_up(self)
983
984     def test_hold_up(self):
985         """ hold BFD session up """
986         bfd_session_up(self)
987         for dummy in range(self.test_session.detect_mult * 2):
988             wait_for_bfd_packet(self)
989             self.test_session.send_packet()
990         self.assert_equal(len(self.vapi.collect_events()), 0,
991                           "number of bfd events")
992         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
993
994     def test_echo_looped_back(self):
995         """ echo packets looped back """
996         # don't need a session in this case..
997         self.vpp_session.remove_vpp_config()
998         self.pg0.enable_capture()
999         echo_packet_count = 10
1000         # random source port low enough to increment a few times..
1001         udp_sport_tx = randint(1, 50000)
1002         udp_sport_rx = udp_sport_tx
1003         echo_packet = (Ether(src=self.pg0.remote_mac,
1004                              dst=self.pg0.local_mac) /
1005                        IPv6(src=self.pg0.remote_ip6,
1006                             dst=self.pg0.local_ip6) /
1007                        UDP(dport=BFD.udp_dport_echo) /
1008                        Raw("this should be looped back"))
1009         for dummy in range(echo_packet_count):
1010             self.sleep(.01, "delay between echo packets")
1011             echo_packet[UDP].sport = udp_sport_tx
1012             udp_sport_tx += 1
1013             self.logger.debug(ppp("Sending packet:", echo_packet))
1014             self.pg0.add_stream(echo_packet)
1015             self.pg_start()
1016         for dummy in range(echo_packet_count):
1017             p = self.pg0.wait_for_packet(1)
1018             self.logger.debug(ppp("Got packet:", p))
1019             ether = p[Ether]
1020             self.assert_equal(self.pg0.remote_mac,
1021                               ether.dst, "Destination MAC")
1022             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1023             ip = p[IPv6]
1024             self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1025             self.assert_equal(self.pg0.local_ip6, ip.src, "Destination IP")
1026             udp = p[UDP]
1027             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1028                               "UDP destination port")
1029             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1030             udp_sport_rx += 1
1031             self.assertTrue(p.haslayer(Raw) and p[Raw] == echo_packet[Raw],
1032                             "Received packet is not the echo packet sent")
1033         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1034                           "ECHO packet identifier for test purposes)")
1035
1036
1037 class BFDSHA1TestCase(VppTestCase):
1038     """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1039
1040     pg0 = None
1041     vpp_clock_offset = None
1042     vpp_session = None
1043     test_session = None
1044
1045     @classmethod
1046     def setUpClass(cls):
1047         super(BFDSHA1TestCase, cls).setUpClass()
1048         try:
1049             cls.create_pg_interfaces([0])
1050             cls.pg0.config_ip4()
1051             cls.pg0.admin_up()
1052             cls.pg0.resolve_arp()
1053
1054         except Exception:
1055             super(BFDSHA1TestCase, cls).tearDownClass()
1056             raise
1057
1058     def setUp(self):
1059         super(BFDSHA1TestCase, self).setUp()
1060         self.factory = AuthKeyFactory()
1061         self.vapi.want_bfd_events()
1062         self.pg0.enable_capture()
1063
1064     def tearDown(self):
1065         if not self.vpp_dead:
1066             self.vapi.want_bfd_events(enable_disable=0)
1067         self.vapi.collect_events()  # clear the event queue
1068         super(BFDSHA1TestCase, self).tearDown()
1069
1070     def test_session_up(self):
1071         """ bring BFD session up """
1072         key = self.factory.create_random_key(self)
1073         key.add_vpp_config()
1074         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1075                                             self.pg0.remote_ip4,
1076                                             sha1_key=key)
1077         self.vpp_session.add_vpp_config()
1078         self.vpp_session.admin_up()
1079         self.test_session = BFDTestSession(
1080             self, self.pg0, AF_INET, sha1_key=key,
1081             bfd_key_id=self.vpp_session.bfd_key_id)
1082         bfd_session_up(self)
1083
1084     def test_hold_up(self):
1085         """ hold BFD session up """
1086         key = self.factory.create_random_key(self)
1087         key.add_vpp_config()
1088         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1089                                             self.pg0.remote_ip4,
1090                                             sha1_key=key)
1091         self.vpp_session.add_vpp_config()
1092         self.vpp_session.admin_up()
1093         self.test_session = BFDTestSession(
1094             self, self.pg0, AF_INET, sha1_key=key,
1095             bfd_key_id=self.vpp_session.bfd_key_id)
1096         bfd_session_up(self)
1097         for dummy in range(self.test_session.detect_mult * 2):
1098             wait_for_bfd_packet(self)
1099             self.test_session.send_packet()
1100         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1101
1102     def test_hold_up_meticulous(self):
1103         """ hold BFD session up - meticulous auth """
1104         key = self.factory.create_random_key(
1105             self, BFDAuthType.meticulous_keyed_sha1)
1106         key.add_vpp_config()
1107         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1108                                             self.pg0.remote_ip4, sha1_key=key)
1109         self.vpp_session.add_vpp_config()
1110         self.vpp_session.admin_up()
1111         # specify sequence number so that it wraps
1112         self.test_session = BFDTestSession(
1113             self, self.pg0, AF_INET, sha1_key=key,
1114             bfd_key_id=self.vpp_session.bfd_key_id,
1115             our_seq_number=0xFFFFFFFF - 4)
1116         bfd_session_up(self)
1117         for dummy in range(30):
1118             wait_for_bfd_packet(self)
1119             self.test_session.inc_seq_num()
1120             self.test_session.send_packet()
1121         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1122
1123     def test_send_bad_seq_number(self):
1124         """ session is not kept alive by msgs with bad seq numbers"""
1125         key = self.factory.create_random_key(
1126             self, BFDAuthType.meticulous_keyed_sha1)
1127         key.add_vpp_config()
1128         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1129                                             self.pg0.remote_ip4, sha1_key=key)
1130         self.vpp_session.add_vpp_config()
1131         self.vpp_session.admin_up()
1132         self.test_session = BFDTestSession(
1133             self, self.pg0, AF_INET, sha1_key=key,
1134             bfd_key_id=self.vpp_session.bfd_key_id)
1135         bfd_session_up(self)
1136         detection_time = self.vpp_session.detect_mult *\
1137             self.vpp_session.required_min_rx / USEC_IN_SEC
1138         session_timeout = time.time() + detection_time
1139         while time.time() < session_timeout:
1140             self.assert_equal(len(self.vapi.collect_events()), 0,
1141                               "number of bfd events")
1142             wait_for_bfd_packet(self)
1143             self.test_session.send_packet()
1144         wait_for_bfd_packet(self)
1145         self.test_session.send_packet()
1146         e = self.vapi.collect_events()
1147         # session should be down now, because the sequence numbers weren't
1148         # updated
1149         self.assert_equal(len(e), 1, "number of bfd events")
1150         verify_event(self, e[0], expected_state=BFDState.down)
1151
1152     def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
1153                                        legitimate_test_session,
1154                                        rogue_test_session,
1155                                        rogue_bfd_values=None):
1156         """ execute a rogue session interaction scenario
1157
1158         1. create vpp session, add config
1159         2. bring the legitimate session up
1160         3. copy the bfd values from legitimate session to rogue session
1161         4. apply rogue_bfd_values to rogue session
1162         5. set rogue session state to down
1163         6. send message to take the session down from the rogue session
1164         7. assert that the legitimate session is unaffected
1165         """
1166
1167         self.vpp_session = vpp_bfd_udp_session
1168         self.vpp_session.add_vpp_config()
1169         self.vpp_session.admin_up()
1170         self.test_session = legitimate_test_session
1171         # bring vpp session up
1172         bfd_session_up(self)
1173         # send packet from rogue session
1174         rogue_test_session.update(
1175             my_discriminator=self.test_session.my_discriminator,
1176             your_discriminator=self.test_session.your_discriminator,
1177             desired_min_tx=self.test_session.desired_min_tx,
1178             required_min_rx=self.test_session.required_min_rx,
1179             detect_mult=self.test_session.detect_mult,
1180             diag=self.test_session.diag,
1181             state=self.test_session.state,
1182             auth_type=self.test_session.auth_type)
1183         if rogue_bfd_values:
1184             rogue_test_session.update(**rogue_bfd_values)
1185         rogue_test_session.update(state=BFDState.down)
1186         rogue_test_session.send_packet()
1187         wait_for_bfd_packet(self)
1188         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1189
1190     def test_mismatch_auth(self):
1191         """ session is not brought down by unauthenticated msg """
1192         key = self.factory.create_random_key(self)
1193         key.add_vpp_config()
1194         vpp_session = VppBFDUDPSession(
1195             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1196         legitimate_test_session = BFDTestSession(
1197             self, self.pg0, AF_INET, sha1_key=key,
1198             bfd_key_id=vpp_session.bfd_key_id)
1199         rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
1200         self.execute_rogue_session_scenario(vpp_session,
1201                                             legitimate_test_session,
1202                                             rogue_test_session)
1203
1204     def test_mismatch_bfd_key_id(self):
1205         """ session is not brought down by msg with non-existent key-id """
1206         key = self.factory.create_random_key(self)
1207         key.add_vpp_config()
1208         vpp_session = VppBFDUDPSession(
1209             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1210         # pick a different random bfd key id
1211         x = randint(0, 255)
1212         while x == vpp_session.bfd_key_id:
1213             x = randint(0, 255)
1214         legitimate_test_session = BFDTestSession(
1215             self, self.pg0, AF_INET, sha1_key=key,
1216             bfd_key_id=vpp_session.bfd_key_id)
1217         rogue_test_session = BFDTestSession(
1218             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
1219         self.execute_rogue_session_scenario(vpp_session,
1220                                             legitimate_test_session,
1221                                             rogue_test_session)
1222
1223     def test_mismatched_auth_type(self):
1224         """ session is not brought down by msg with wrong auth type """
1225         key = self.factory.create_random_key(self)
1226         key.add_vpp_config()
1227         vpp_session = VppBFDUDPSession(
1228             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1229         legitimate_test_session = BFDTestSession(
1230             self, self.pg0, AF_INET, sha1_key=key,
1231             bfd_key_id=vpp_session.bfd_key_id)
1232         rogue_test_session = BFDTestSession(
1233             self, self.pg0, AF_INET, sha1_key=key,
1234             bfd_key_id=vpp_session.bfd_key_id)
1235         self.execute_rogue_session_scenario(
1236             vpp_session, legitimate_test_session, rogue_test_session,
1237             {'auth_type': BFDAuthType.keyed_md5})
1238
1239     def test_restart(self):
1240         """ simulate remote peer restart and resynchronization """
1241         key = self.factory.create_random_key(
1242             self, BFDAuthType.meticulous_keyed_sha1)
1243         key.add_vpp_config()
1244         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1245                                             self.pg0.remote_ip4, sha1_key=key)
1246         self.vpp_session.add_vpp_config()
1247         self.vpp_session.admin_up()
1248         self.test_session = BFDTestSession(
1249             self, self.pg0, AF_INET, sha1_key=key,
1250             bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
1251         bfd_session_up(self)
1252         # don't send any packets for 2*detection_time
1253         detection_time = self.vpp_session.detect_mult *\
1254             self.vpp_session.required_min_rx / USEC_IN_SEC
1255         self.sleep(detection_time, "simulating peer restart")
1256         events = self.vapi.collect_events()
1257         self.assert_equal(len(events), 1, "number of bfd events")
1258         verify_event(self, events[0], expected_state=BFDState.down)
1259         self.test_session.update(state=BFDState.down)
1260         # reset sequence number
1261         self.test_session.our_seq_number = 0
1262         self.test_session.vpp_seq_number = None
1263         # now throw away any pending packets
1264         self.pg0.enable_capture()
1265         bfd_session_up(self)
1266
1267
1268 class BFDAuthOnOffTestCase(VppTestCase):
1269     """Bidirectional Forwarding Detection (BFD) (changing auth) """
1270
1271     pg0 = None
1272     vpp_session = None
1273     test_session = None
1274
1275     @classmethod
1276     def setUpClass(cls):
1277         super(BFDAuthOnOffTestCase, cls).setUpClass()
1278         try:
1279             cls.create_pg_interfaces([0])
1280             cls.pg0.config_ip4()
1281             cls.pg0.admin_up()
1282             cls.pg0.resolve_arp()
1283
1284         except Exception:
1285             super(BFDAuthOnOffTestCase, cls).tearDownClass()
1286             raise
1287
1288     def setUp(self):
1289         super(BFDAuthOnOffTestCase, self).setUp()
1290         self.factory = AuthKeyFactory()
1291         self.vapi.want_bfd_events()
1292         self.pg0.enable_capture()
1293
1294     def tearDown(self):
1295         if not self.vpp_dead:
1296             self.vapi.want_bfd_events(enable_disable=0)
1297         self.vapi.collect_events()  # clear the event queue
1298         super(BFDAuthOnOffTestCase, self).tearDown()
1299
1300     def test_auth_on_immediate(self):
1301         """ turn auth on without disturbing session state (immediate) """
1302         key = self.factory.create_random_key(self)
1303         key.add_vpp_config()
1304         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1305                                             self.pg0.remote_ip4)
1306         self.vpp_session.add_vpp_config()
1307         self.vpp_session.admin_up()
1308         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
1309         bfd_session_up(self)
1310         for dummy in range(self.test_session.detect_mult * 2):
1311             p = wait_for_bfd_packet(self)
1312             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1313             self.test_session.send_packet()
1314         self.vpp_session.activate_auth(key)
1315         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1316         self.test_session.sha1_key = key
1317         for dummy in range(self.test_session.detect_mult * 2):
1318             p = wait_for_bfd_packet(self)
1319             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1320             self.test_session.send_packet()
1321         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1322         self.assert_equal(len(self.vapi.collect_events()), 0,
1323                           "number of bfd events")
1324
1325     def test_auth_off_immediate(self):
1326         """ turn auth off without disturbing session state (immediate) """
1327         key = self.factory.create_random_key(self)
1328         key.add_vpp_config()
1329         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1330                                             self.pg0.remote_ip4, sha1_key=key)
1331         self.vpp_session.add_vpp_config()
1332         self.vpp_session.admin_up()
1333         self.test_session = BFDTestSession(
1334             self, self.pg0, AF_INET, sha1_key=key,
1335             bfd_key_id=self.vpp_session.bfd_key_id)
1336         bfd_session_up(self)
1337         # self.vapi.want_bfd_events(enable_disable=0)
1338         for dummy in range(self.test_session.detect_mult * 2):
1339             p = wait_for_bfd_packet(self)
1340             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1341             self.test_session.inc_seq_num()
1342             self.test_session.send_packet()
1343         self.vpp_session.deactivate_auth()
1344         self.test_session.bfd_key_id = None
1345         self.test_session.sha1_key = None
1346         for dummy in range(self.test_session.detect_mult * 2):
1347             p = wait_for_bfd_packet(self)
1348             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1349             self.test_session.inc_seq_num()
1350             self.test_session.send_packet()
1351         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1352         self.assert_equal(len(self.vapi.collect_events()), 0,
1353                           "number of bfd events")
1354
1355     def test_auth_change_key_immediate(self):
1356         """ change auth key without disturbing session state (immediate) """
1357         key1 = self.factory.create_random_key(self)
1358         key1.add_vpp_config()
1359         key2 = self.factory.create_random_key(self)
1360         key2.add_vpp_config()
1361         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1362                                             self.pg0.remote_ip4, sha1_key=key1)
1363         self.vpp_session.add_vpp_config()
1364         self.vpp_session.admin_up()
1365         self.test_session = BFDTestSession(
1366             self, self.pg0, AF_INET, sha1_key=key1,
1367             bfd_key_id=self.vpp_session.bfd_key_id)
1368         bfd_session_up(self)
1369         for dummy in range(self.test_session.detect_mult * 2):
1370             p = wait_for_bfd_packet(self)
1371             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1372             self.test_session.send_packet()
1373         self.vpp_session.activate_auth(key2)
1374         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1375         self.test_session.sha1_key = key2
1376         for dummy in range(self.test_session.detect_mult * 2):
1377             p = wait_for_bfd_packet(self)
1378             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1379             self.test_session.send_packet()
1380         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1381         self.assert_equal(len(self.vapi.collect_events()), 0,
1382                           "number of bfd events")
1383
1384     def test_auth_on_delayed(self):
1385         """ turn auth on without disturbing session state (delayed) """
1386         key = self.factory.create_random_key(self)
1387         key.add_vpp_config()
1388         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1389                                             self.pg0.remote_ip4)
1390         self.vpp_session.add_vpp_config()
1391         self.vpp_session.admin_up()
1392         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
1393         bfd_session_up(self)
1394         for dummy in range(self.test_session.detect_mult * 2):
1395             wait_for_bfd_packet(self)
1396             self.test_session.send_packet()
1397         self.vpp_session.activate_auth(key, delayed=True)
1398         for dummy in range(self.test_session.detect_mult * 2):
1399             p = wait_for_bfd_packet(self)
1400             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1401             self.test_session.send_packet()
1402         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1403         self.test_session.sha1_key = key
1404         self.test_session.send_packet()
1405         for dummy in range(self.test_session.detect_mult * 2):
1406             p = wait_for_bfd_packet(self)
1407             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1408             self.test_session.send_packet()
1409         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1410         self.assert_equal(len(self.vapi.collect_events()), 0,
1411                           "number of bfd events")
1412
1413     def test_auth_off_delayed(self):
1414         """ turn auth off without disturbing session state (delayed) """
1415         key = self.factory.create_random_key(self)
1416         key.add_vpp_config()
1417         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1418                                             self.pg0.remote_ip4, sha1_key=key)
1419         self.vpp_session.add_vpp_config()
1420         self.vpp_session.admin_up()
1421         self.test_session = BFDTestSession(
1422             self, self.pg0, AF_INET, sha1_key=key,
1423             bfd_key_id=self.vpp_session.bfd_key_id)
1424         bfd_session_up(self)
1425         for dummy in range(self.test_session.detect_mult * 2):
1426             p = wait_for_bfd_packet(self)
1427             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1428             self.test_session.send_packet()
1429         self.vpp_session.deactivate_auth(delayed=True)
1430         for dummy in range(self.test_session.detect_mult * 2):
1431             p = wait_for_bfd_packet(self)
1432             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1433             self.test_session.send_packet()
1434         self.test_session.bfd_key_id = None
1435         self.test_session.sha1_key = None
1436         self.test_session.send_packet()
1437         for dummy in range(self.test_session.detect_mult * 2):
1438             p = wait_for_bfd_packet(self)
1439             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1440             self.test_session.send_packet()
1441         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1442         self.assert_equal(len(self.vapi.collect_events()), 0,
1443                           "number of bfd events")
1444
1445     def test_auth_change_key_delayed(self):
1446         """ change auth key without disturbing session state (delayed) """
1447         key1 = self.factory.create_random_key(self)
1448         key1.add_vpp_config()
1449         key2 = self.factory.create_random_key(self)
1450         key2.add_vpp_config()
1451         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1452                                             self.pg0.remote_ip4, sha1_key=key1)
1453         self.vpp_session.add_vpp_config()
1454         self.vpp_session.admin_up()
1455         self.test_session = BFDTestSession(
1456             self, self.pg0, AF_INET, sha1_key=key1,
1457             bfd_key_id=self.vpp_session.bfd_key_id)
1458         bfd_session_up(self)
1459         for dummy in range(self.test_session.detect_mult * 2):
1460             p = wait_for_bfd_packet(self)
1461             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1462             self.test_session.send_packet()
1463         self.vpp_session.activate_auth(key2, delayed=True)
1464         for dummy in range(self.test_session.detect_mult * 2):
1465             p = wait_for_bfd_packet(self)
1466             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1467             self.test_session.send_packet()
1468         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1469         self.test_session.sha1_key = key2
1470         self.test_session.send_packet()
1471         for dummy in range(self.test_session.detect_mult * 2):
1472             p = wait_for_bfd_packet(self)
1473             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1474             self.test_session.send_packet()
1475         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1476         self.assert_equal(len(self.vapi.collect_events()), 0,
1477                           "number of bfd events")
1478
1479 if __name__ == '__main__':
1480     unittest.main(testRunner=VppTestRunner)