67a7ef13ea79e2e150ba986ae75af70f69afe37a
[vpp.git] / test / test_syslog.py
1 #!/usr/bin/env python
2
3 import unittest
4 from framework import VppTestCase, VppTestRunner
5 from util import ppp
6 from scapy.packet import Raw
7 from scapy.layers.inet import IP, UDP
8 from vpp_papi_provider import SYSLOG_SEVERITY
9 from syslog_rfc5424_parser import SyslogMessage, ParseError
10 from syslog_rfc5424_parser.constants import SyslogFacility, SyslogSeverity
11
12
13 class TestSyslog(VppTestCase):
14     """ Syslog Protocol Test Cases """
15
16     @classmethod
17     def setUpClass(cls):
18         super(TestSyslog, cls).setUpClass()
19
20         try:
21             cls.pg0, = cls.create_pg_interfaces(range(1))
22             cls.pg0.admin_up()
23             cls.pg0.config_ip4()
24             cls.pg0.resolve_arp()
25
26         except Exception:
27             super(TestSyslog, cls).tearDownClass()
28             raise
29
30     def syslog_generate(self, facility, severity, appname, msgid, sd=None,
31                         msg=None):
32         """
33         Generate syslog message
34
35         :param facility: facility value
36         :param severity: severity level
37         :param appname: application name that originate message
38         :param msgid: message identifier
39         :param sd: structured data (optional)
40         :param msg: free-form message (optional)
41         """
42         facility_str = ['kernel', 'user-level', 'mail-system',
43                         'system-daemons', 'security-authorization', 'syslogd',
44                         'line-printer', 'network-news', 'uucp', 'clock-daemon',
45                         '', 'ftp-daemon', 'ntp-subsystem', 'log-audit',
46                         'log-alert', '', 'local0', 'local1', 'local2',
47                         'local3', 'local4', 'local5', 'local6', 'local7']
48
49         severity_str = ['emergency', 'alert', 'critical', 'error', 'warning',
50                         'notice', 'informational', 'debug']
51
52         cli_str = "test syslog %s %s %s %s" % (facility_str[facility],
53                                                severity_str[severity],
54                                                appname,
55                                                msgid)
56         if sd is not None:
57             for sd_id, sd_params in sd.items():
58                 cli_str += " sd-id %s" % (sd_id)
59                 for name, value in sd_params.items():
60                     cli_str += " sd-param %s %s" % (name, value)
61         if msg is not None:
62             cli_str += " %s" % (msg)
63         self.vapi.cli(cli_str)
64
65     def syslog_verify(self, data, facility, severity, appname, msgid, sd=None,
66                       msg=None):
67         """
68         Verify syslog message
69
70         :param data: syslog message
71         :param facility: facility value
72         :param severity: severity level
73         :param appname: application name that originate message
74         :param msgid: message identifier
75         :param sd: structured data (optional)
76         :param msg: free-form message (optional)
77         """
78         message = data.decode('utf-8')
79         if sd is None:
80             sd = {}
81         try:
82             message = SyslogMessage.parse(message)
83         except ParseError as e:
84             self.logger.error(e)
85             raise
86         else:
87             self.assertEqual(message.facility, facility)
88             self.assertEqual(message.severity, severity)
89             self.assertEqual(message.appname, appname)
90             self.assertEqual(message.msgid, msgid)
91             self.assertEqual(message.msg, msg)
92             self.assertEqual(message.sd, sd)
93             self.assertEqual(message.version, 1)
94             self.assertEqual(message.hostname, self.pg0.local_ip4)
95
96     def test_syslog(self):
97         """ Syslog Protocol test """
98         self.vapi.syslog_set_sender(src_address=self.pg0.local_ip4n,
99                                     collector_address=self.pg0.remote_ip4n)
100         config = self.vapi.syslog_get_sender()
101         self.assertEqual(str(config.collector_address),
102                          self.pg0.remote_ip4)
103         self.assertEqual(config.collector_port, 514)
104         self.assertEqual(str(config.src_address), self.pg0.local_ip4)
105         self.assertEqual(config.vrf_id, 0)
106         self.assertEqual(config.max_msg_size, 480)
107
108         appname = 'test'
109         msgid = 'testMsg'
110         msg = 'this is message'
111         sd1 = {'exampleSDID@32473': {'iut': '3',
112                                      'eventSource': 'App',
113                                      'eventID': '1011'}}
114         sd2 = {'exampleSDID@32473': {'iut': '3',
115                                      'eventSource': 'App',
116                                      'eventID': '1011'},
117                'examplePriority@32473': {'class': 'high'}}
118
119         self.pg_enable_capture(self.pg_interfaces)
120         self.syslog_generate(SyslogFacility.local7,
121                              SyslogSeverity.info,
122                              appname,
123                              msgid,
124                              None,
125                              msg)
126         capture = self.pg0.get_capture(1)
127         try:
128             self.assertEqual(capture[0][IP].src, self.pg0.local_ip4)
129             self.assertEqual(capture[0][IP].dst, self.pg0.remote_ip4)
130             self.assertEqual(capture[0][UDP].dport, 514)
131             self.assert_packet_checksums_valid(capture[0], False)
132         except:
133             self.logger.error(ppp("invalid packet:", capture[0]))
134             raise
135         self.syslog_verify(capture[0][Raw].load,
136                            SyslogFacility.local7,
137                            SyslogSeverity.info,
138                            appname,
139                            msgid,
140                            None,
141                            msg)
142
143         self.pg_enable_capture(self.pg_interfaces)
144         self.vapi.syslog_set_filter(SYSLOG_SEVERITY.WARN)
145         filter = self.vapi.syslog_get_filter()
146         self.assertEqual(filter.severity, SYSLOG_SEVERITY.WARN)
147         self.syslog_generate(SyslogFacility.local7,
148                              SyslogSeverity.info,
149                              appname,
150                              msgid,
151                              None,
152                              msg)
153         self.pg0.assert_nothing_captured()
154
155         self.pg_enable_capture(self.pg_interfaces)
156         self.syslog_generate(SyslogFacility.local6,
157                              SyslogSeverity.warning,
158                              appname,
159                              msgid,
160                              sd1,
161                              msg)
162         capture = self.pg0.get_capture(1)
163         self.syslog_verify(capture[0][Raw].load,
164                            SyslogFacility.local6,
165                            SyslogSeverity.warning,
166                            appname,
167                            msgid,
168                            sd1,
169                            msg)
170
171         self.vapi.syslog_set_sender(self.pg0.local_ip4n,
172                                     self.pg0.remote_ip4n,
173                                     collector_port=12345)
174         config = self.vapi.syslog_get_sender()
175         self.assertEqual(config.collector_port, 12345)
176
177         self.pg_enable_capture(self.pg_interfaces)
178         self.syslog_generate(SyslogFacility.local5,
179                              SyslogSeverity.err,
180                              appname,
181                              msgid,
182                              sd2,
183                              None)
184         capture = self.pg0.get_capture(1)
185         try:
186             self.assertEqual(capture[0][UDP].dport, 12345)
187         except:
188             self.logger.error(ppp("invalid packet:", capture[0]))
189             raise
190         self.syslog_verify(capture[0][Raw].load,
191                            SyslogFacility.local5,
192                            SyslogSeverity.err,
193                            appname,
194                            msgid,
195                            sd2,
196                            None)
197
198
199 if __name__ == '__main__':
200     unittest.main(testRunner=VppTestRunner)