Tests Cleanup: Fix missing calls to setUpClass/tearDownClass.
[vpp.git] / test / test_syslog.py
1 #!/usr/bin/env python
2
3 import unittest
4 from framework import VppTestCase, VppTestRunner
5 from util import ppp
6 from scapy.packet import Raw
7 from scapy.layers.inet import IP, UDP
8 from vpp_papi_provider import SYSLOG_SEVERITY
9 from syslog_rfc5424_parser import SyslogMessage, ParseError
10 from syslog_rfc5424_parser.constants import SyslogFacility, SyslogSeverity
11
12
13 class TestSyslog(VppTestCase):
14     """ Syslog Protocol Test Cases """
15
16     @classmethod
17     def setUpClass(cls):
18         super(TestSyslog, cls).setUpClass()
19
20         try:
21             cls.pg0, = cls.create_pg_interfaces(range(1))
22             cls.pg0.admin_up()
23             cls.pg0.config_ip4()
24             cls.pg0.resolve_arp()
25
26         except Exception:
27             super(TestSyslog, cls).tearDownClass()
28             raise
29
30     @classmethod
31     def tearDownClass(cls):
32         super(TestSyslog, cls).tearDownClass()
33
34     def syslog_generate(self, facility, severity, appname, msgid, sd=None,
35                         msg=None):
36         """
37         Generate syslog message
38
39         :param facility: facility value
40         :param severity: severity level
41         :param appname: application name that originate message
42         :param msgid: message identifier
43         :param sd: structured data (optional)
44         :param msg: free-form message (optional)
45         """
46         facility_str = ['kernel', 'user-level', 'mail-system',
47                         'system-daemons', 'security-authorization', 'syslogd',
48                         'line-printer', 'network-news', 'uucp', 'clock-daemon',
49                         '', 'ftp-daemon', 'ntp-subsystem', 'log-audit',
50                         'log-alert', '', 'local0', 'local1', 'local2',
51                         'local3', 'local4', 'local5', 'local6', 'local7']
52
53         severity_str = ['emergency', 'alert', 'critical', 'error', 'warning',
54                         'notice', 'informational', 'debug']
55
56         cli_str = "test syslog %s %s %s %s" % (facility_str[facility],
57                                                severity_str[severity],
58                                                appname,
59                                                msgid)
60         if sd is not None:
61             for sd_id, sd_params in sd.items():
62                 cli_str += " sd-id %s" % (sd_id)
63                 for name, value in sd_params.items():
64                     cli_str += " sd-param %s %s" % (name, value)
65         if msg is not None:
66             cli_str += " %s" % (msg)
67         self.vapi.cli(cli_str)
68
69     def syslog_verify(self, data, facility, severity, appname, msgid, sd=None,
70                       msg=None):
71         """
72         Verify syslog message
73
74         :param data: syslog message
75         :param facility: facility value
76         :param severity: severity level
77         :param appname: application name that originate message
78         :param msgid: message identifier
79         :param sd: structured data (optional)
80         :param msg: free-form message (optional)
81         """
82         message = data.decode('utf-8')
83         if sd is None:
84             sd = {}
85         try:
86             message = SyslogMessage.parse(message)
87         except ParseError as e:
88             self.logger.error(e)
89             raise
90         else:
91             self.assertEqual(message.facility, facility)
92             self.assertEqual(message.severity, severity)
93             self.assertEqual(message.appname, appname)
94             self.assertEqual(message.msgid, msgid)
95             self.assertEqual(message.msg, msg)
96             self.assertEqual(message.sd, sd)
97             self.assertEqual(message.version, 1)
98             self.assertEqual(message.hostname, self.pg0.local_ip4)
99
100     def test_syslog(self):
101         """ Syslog Protocol test """
102         self.vapi.syslog_set_sender(src_address=self.pg0.local_ip4n,
103                                     collector_address=self.pg0.remote_ip4n)
104         config = self.vapi.syslog_get_sender()
105         self.assertEqual(str(config.collector_address),
106                          self.pg0.remote_ip4)
107         self.assertEqual(config.collector_port, 514)
108         self.assertEqual(str(config.src_address), self.pg0.local_ip4)
109         self.assertEqual(config.vrf_id, 0)
110         self.assertEqual(config.max_msg_size, 480)
111
112         appname = 'test'
113         msgid = 'testMsg'
114         msg = 'this is message'
115         sd1 = {'exampleSDID@32473': {'iut': '3',
116                                      'eventSource': 'App',
117                                      'eventID': '1011'}}
118         sd2 = {'exampleSDID@32473': {'iut': '3',
119                                      'eventSource': 'App',
120                                      'eventID': '1011'},
121                'examplePriority@32473': {'class': 'high'}}
122
123         self.pg_enable_capture(self.pg_interfaces)
124         self.syslog_generate(SyslogFacility.local7,
125                              SyslogSeverity.info,
126                              appname,
127                              msgid,
128                              None,
129                              msg)
130         capture = self.pg0.get_capture(1)
131         try:
132             self.assertEqual(capture[0][IP].src, self.pg0.local_ip4)
133             self.assertEqual(capture[0][IP].dst, self.pg0.remote_ip4)
134             self.assertEqual(capture[0][UDP].dport, 514)
135             self.assert_packet_checksums_valid(capture[0], False)
136         except:
137             self.logger.error(ppp("invalid packet:", capture[0]))
138             raise
139         self.syslog_verify(capture[0][Raw].load,
140                            SyslogFacility.local7,
141                            SyslogSeverity.info,
142                            appname,
143                            msgid,
144                            None,
145                            msg)
146
147         self.pg_enable_capture(self.pg_interfaces)
148         self.vapi.syslog_set_filter(SYSLOG_SEVERITY.WARN)
149         filter = self.vapi.syslog_get_filter()
150         self.assertEqual(filter.severity, SYSLOG_SEVERITY.WARN)
151         self.syslog_generate(SyslogFacility.local7,
152                              SyslogSeverity.info,
153                              appname,
154                              msgid,
155                              None,
156                              msg)
157         self.pg0.assert_nothing_captured()
158
159         self.pg_enable_capture(self.pg_interfaces)
160         self.syslog_generate(SyslogFacility.local6,
161                              SyslogSeverity.warning,
162                              appname,
163                              msgid,
164                              sd1,
165                              msg)
166         capture = self.pg0.get_capture(1)
167         self.syslog_verify(capture[0][Raw].load,
168                            SyslogFacility.local6,
169                            SyslogSeverity.warning,
170                            appname,
171                            msgid,
172                            sd1,
173                            msg)
174
175         self.vapi.syslog_set_sender(self.pg0.local_ip4n,
176                                     self.pg0.remote_ip4n,
177                                     collector_port=12345)
178         config = self.vapi.syslog_get_sender()
179         self.assertEqual(config.collector_port, 12345)
180
181         self.pg_enable_capture(self.pg_interfaces)
182         self.syslog_generate(SyslogFacility.local5,
183                              SyslogSeverity.err,
184                              appname,
185                              msgid,
186                              sd2,
187                              None)
188         capture = self.pg0.get_capture(1)
189         try:
190             self.assertEqual(capture[0][UDP].dport, 12345)
191         except:
192             self.logger.error(ppp("invalid packet:", capture[0]))
193             raise
194         self.syslog_verify(capture[0][Raw].load,
195                            SyslogFacility.local5,
196                            SyslogSeverity.err,
197                            appname,
198                            msgid,
199                            sd2,
200                            None)
201
202
203 if __name__ == '__main__':
204     unittest.main(testRunner=VppTestRunner)