lb: remove api boilerplate
[vpp.git] / test / test_syslog.py
1 #!/usr/bin/env python
2
3 import unittest
4 from framework import VppTestCase, VppTestRunner
5 from util import ppp
6 from scapy.packet import Raw
7 from scapy.layers.inet import IP, UDP
8 from syslog_rfc5424_parser import SyslogMessage, ParseError
9 from syslog_rfc5424_parser.constants import SyslogFacility, SyslogSeverity
10 from vpp_papi import VppEnum
11
12
13 class TestSyslog(VppTestCase):
14     """ Syslog Protocol Test Cases """
15
16     @property
17     def SYSLOG_SEVERITY(self):
18         return VppEnum.vl_api_syslog_severity_t
19
20     @classmethod
21     def setUpClass(cls):
22         super(TestSyslog, cls).setUpClass()
23
24         try:
25             cls.pg0, = cls.create_pg_interfaces(range(1))
26             cls.pg0.admin_up()
27             cls.pg0.config_ip4()
28             cls.pg0.resolve_arp()
29
30         except Exception:
31             super(TestSyslog, cls).tearDownClass()
32             raise
33
34     @classmethod
35     def tearDownClass(cls):
36         super(TestSyslog, cls).tearDownClass()
37
38     def syslog_generate(self, facility, severity, appname, msgid, sd=None,
39                         msg=None):
40         """
41         Generate syslog message
42
43         :param facility: facility value
44         :param severity: severity level
45         :param appname: application name that originate message
46         :param msgid: message identifier
47         :param sd: structured data (optional)
48         :param msg: free-form message (optional)
49         """
50         facility_str = ['kernel', 'user-level', 'mail-system',
51                         'system-daemons', 'security-authorization', 'syslogd',
52                         'line-printer', 'network-news', 'uucp', 'clock-daemon',
53                         '', 'ftp-daemon', 'ntp-subsystem', 'log-audit',
54                         'log-alert', '', 'local0', 'local1', 'local2',
55                         'local3', 'local4', 'local5', 'local6', 'local7']
56
57         severity_str = ['emergency', 'alert', 'critical', 'error', 'warning',
58                         'notice', 'informational', 'debug']
59
60         cli_str = "test syslog %s %s %s %s" % (facility_str[facility],
61                                                severity_str[severity],
62                                                appname,
63                                                msgid)
64         if sd is not None:
65             for sd_id, sd_params in sd.items():
66                 cli_str += " sd-id %s" % (sd_id)
67                 for name, value in sd_params.items():
68                     cli_str += " sd-param %s %s" % (name, value)
69         if msg is not None:
70             cli_str += " %s" % (msg)
71         self.vapi.cli(cli_str)
72
73     def syslog_verify(self, data, facility, severity, appname, msgid, sd=None,
74                       msg=None):
75         """
76         Verify syslog message
77
78         :param data: syslog message
79         :param facility: facility value
80         :param severity: severity level
81         :param appname: application name that originate message
82         :param msgid: message identifier
83         :param sd: structured data (optional)
84         :param msg: free-form message (optional)
85         """
86         message = data.decode('utf-8')
87         if sd is None:
88             sd = {}
89         try:
90             message = SyslogMessage.parse(message)
91         except ParseError as e:
92             self.logger.error(e)
93             raise
94         else:
95             self.assertEqual(message.facility, facility)
96             self.assertEqual(message.severity, severity)
97             self.assertEqual(message.appname, appname)
98             self.assertEqual(message.msgid, msgid)
99             self.assertEqual(message.msg, msg)
100             self.assertEqual(message.sd, sd)
101             self.assertEqual(message.version, 1)
102             self.assertEqual(message.hostname, self.pg0.local_ip4)
103
104     def test_syslog(self):
105         """ Syslog Protocol test """
106         self.vapi.syslog_set_sender(src_address=self.pg0.local_ip4n,
107                                     collector_address=self.pg0.remote_ip4n)
108         config = self.vapi.syslog_get_sender()
109         self.assertEqual(str(config.collector_address),
110                          self.pg0.remote_ip4)
111         self.assertEqual(config.collector_port, 514)
112         self.assertEqual(str(config.src_address), self.pg0.local_ip4)
113         self.assertEqual(config.vrf_id, 0)
114         self.assertEqual(config.max_msg_size, 480)
115
116         appname = 'test'
117         msgid = 'testMsg'
118         msg = 'this is message'
119         sd1 = {'exampleSDID@32473': {'iut': '3',
120                                      'eventSource': 'App',
121                                      'eventID': '1011'}}
122         sd2 = {'exampleSDID@32473': {'iut': '3',
123                                      'eventSource': 'App',
124                                      'eventID': '1011'},
125                'examplePriority@32473': {'class': 'high'}}
126
127         self.pg_enable_capture(self.pg_interfaces)
128         self.syslog_generate(SyslogFacility.local7,
129                              SyslogSeverity.info,
130                              appname,
131                              msgid,
132                              None,
133                              msg)
134         capture = self.pg0.get_capture(1)
135         try:
136             self.assertEqual(capture[0][IP].src, self.pg0.local_ip4)
137             self.assertEqual(capture[0][IP].dst, self.pg0.remote_ip4)
138             self.assertEqual(capture[0][UDP].dport, 514)
139             self.assert_packet_checksums_valid(capture[0], False)
140         except:
141             self.logger.error(ppp("invalid packet:", capture[0]))
142             raise
143         self.syslog_verify(capture[0][Raw].load,
144                            SyslogFacility.local7,
145                            SyslogSeverity.info,
146                            appname,
147                            msgid,
148                            None,
149                            msg)
150
151         self.pg_enable_capture(self.pg_interfaces)
152         self.vapi.syslog_set_filter(
153             self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_WARN)
154         filter = self.vapi.syslog_get_filter()
155         self.assertEqual(filter.severity,
156                          self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_WARN)
157         self.syslog_generate(SyslogFacility.local7,
158                              SyslogSeverity.info,
159                              appname,
160                              msgid,
161                              None,
162                              msg)
163         self.pg0.assert_nothing_captured()
164
165         self.pg_enable_capture(self.pg_interfaces)
166         self.syslog_generate(SyslogFacility.local6,
167                              SyslogSeverity.warning,
168                              appname,
169                              msgid,
170                              sd1,
171                              msg)
172         capture = self.pg0.get_capture(1)
173         self.syslog_verify(capture[0][Raw].load,
174                            SyslogFacility.local6,
175                            SyslogSeverity.warning,
176                            appname,
177                            msgid,
178                            sd1,
179                            msg)
180
181         self.vapi.syslog_set_sender(self.pg0.local_ip4n,
182                                     self.pg0.remote_ip4n,
183                                     collector_port=12345)
184         config = self.vapi.syslog_get_sender()
185         self.assertEqual(config.collector_port, 12345)
186
187         self.pg_enable_capture(self.pg_interfaces)
188         self.syslog_generate(SyslogFacility.local5,
189                              SyslogSeverity.err,
190                              appname,
191                              msgid,
192                              sd2,
193                              None)
194         capture = self.pg0.get_capture(1)
195         try:
196             self.assertEqual(capture[0][UDP].dport, 12345)
197         except:
198             self.logger.error(ppp("invalid packet:", capture[0]))
199             raise
200         self.syslog_verify(capture[0][Raw].load,
201                            SyslogFacility.local5,
202                            SyslogSeverity.err,
203                            appname,
204                            msgid,
205                            sd2,
206                            None)
207
208
209 if __name__ == '__main__':
210     unittest.main(testRunner=VppTestRunner)