Tutorial - Custom Parser

Parser Development

The purpose of a Parser is to process raw content collected by the Client and map it into a format that is usable by Combiners and Rules. Raw content is content obtained directly from a system file or command, and may be collected by Insights Client, or from some other source such as a SOS Report. The following examples will demonstrate development of different types of parsers.

You can find the complete implementation of the parser and test code in the directory insights-core-tutorials/insights_examples/parsers.

Secure Shell Parser

Overview

Secure Shell or ssh (“SSH”) is a commonly used tool to access and interact with remote systems. SSH server is configured on a system using the /etc/sshd_config file. Red Hat Enterprise Linux utilizes OpenSSH and the documentation for the /etc/sshd_config file is located here.

Here is a portion of the configuration file showing the syntax:

#   $OpenBSD: sshd_config,v 1.93 2014/01/10 05:59:19 djm Exp $

Port 22
#AddressFamily any
ListenAddress 10.110.0.1
#ListenAddress ::

# The default requires explicit activation of protocol 1
#Protocol 2

Many lines begin with a # indicating comments, and blank lines are used to aid readability. The important lines have a configuration keyword followed by space and then a configuration value. So in the parser we want to make sure we capture the important lines and ignore the comments and blank lines.

Creating the Initial Parser Files

First we need to create the parser file. Parser files are implemented in modules. The module should be limited to one type of application. In this case we are working with the ssh application so we will create an secure_shell module. Create the module file ~/work/insights-core-tutorials/mycomponents/parsers/secure_shell.py in the parsers directory:

(env)[userone@hostone ~]$ cd ~/work/insights-core-tutorials/mycomponents/parsers
(env)[userone@hostone parsers]$ touch secure_shell.py

Now edit the file and create the parser skeleton:

1
2
3
4
5
6
7
8
9
from insights import Parser, parser
from insights.specs import Specs


@parser(Specs.sshd_config)
class SSHDConfig(Parser):

    def parse_content(self, content):
        pass

We start by importing the Parser class and the parser decorator. Our parser will inherit from the Parser class and it will be associated with the Specs.sshd_config data source using the parser decorator. Finally we need to implement the parse_content subroutine which is required to parse and store the input data in our class. The base class Parser implements a constructor that will invoke our parse_content method when the class is created.

Next we’ll create the parser test file ~/work/insights-core-tutorials/mycomponents/parsers/tests/test_secure_shell.py as a skeleton that will aid in the parser development process:

1
2
3
4
5
from mycomponents.parsers.secure_shell import SSHDConfig


def test_sshd_config():
    pass

Once you have created and saved both of these files and we’ll run the test to make sure everything is setup correctly:

(env)[userone@hostone ~]$ cd ~/work/insights-core-tutorials
(env)[userone@hostone insights-core-tutorials]$ pytest -k secure_shell

======================== test session starts =============================
platform linux2 -- Python 2.7.15, pytest-3.0.6, py-1.7.0, pluggy-0.4.0
rootdir: /home/userone/work/mycomponents, inifile:
plugins: cov-2.4.0
collected 3 items

mycomponents/parsers/tests/test_secure_shell.py ...

===================== 3 passed in 1.26 seconds ===========================

Hint

You may sometimes see a message that pytest cannot be found, or see some other related message that doesn’t make sense. The first think to check is that you have activated your virtual environment by executing the command source bin/activate from the root directory of your insights-core-tutorials project. You can deactivate the virtual environment by typing deactivate. You can find more information about virtual environments here: http://docs.python-guide.org/en/latest/dev/virtualenvs/

Parser Implementation

Typically parser and combiner development is driven by rules that need facts generated by the parsers and combiners. Regardless of the specific requirements, it is important (1) to implement basic functionality by getting the raw data into a usable format, and (2) to not overdo the implementation because we can’t anticipate every use of the parser output. In our example we will eventually be implementing the rules that will warn us about systems that are not configured properly. Initially our parser implementation will be parsing the input data into key/value pairs. We may later discover that we can optimize rules by moving duplicate or complex processing into the parser.

Test Code

Referring back to our sample SSHD input we will start by creating a test for the output that we want from our parser:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from mycomponents.parsers.secure_shell import SSHDConfig
from insights.tests import context_wrap

SSHD_CONFIG_INPUT = """
#    $OpenBSD: sshd_config,v 1.93 2014/01/10 05:59:19 djm Exp $

Port 22
#AddressFamily any
ListenAddress 10.110.0.1
Port 22
ListenAddress 10.110.1.1
#ListenAddress ::

# The default requires explicit activation of protocol 1
#Protocol 2
Protocol 1
"""


def test_sshd_config():
    sshd_config = SSHDConfig(context_wrap(SSHD_CONFIG_INPUT))
    assert sshd_config is not None
    assert 'Port' in sshd_config
    assert 'PORT' in sshd_config
    assert sshd_config['port'] == ['22', '22']
    assert 'ListenAddress' in sshd_config
    assert sshd_config['ListenAddress'] == ['10.110.0.1', '10.110.1.1']
    assert sshd_config['Protocol'] == ['1']
    assert 'AddressFamily' not in sshd_config
    ports = [l for l in sshd_config if l.keyword == 'Port']
    assert len(ports) == 2
    assert ports[0].value == '22'

First we added an import for the helper function context_wrap which we’ll use to put our input data into a Context object to pass to our class constructor:

1
2
from mycomponents.parsers.secure_shell import SSHDConfig
from insights.tests import context_wrap

Next we include the sample data that will be used for the test. Use of the strip() function ensures that all white space at the beginning and end of the data are removed:

 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
SSHD_CONFIG_INPUT = """
#    $OpenBSD: sshd_config,v 1.93 2014/01/10 05:59:19 djm Exp $

Port 22
#AddressFamily any
ListenAddress 10.110.0.1
Port 22
ListenAddress 10.110.1.1
#ListenAddress ::

# The default requires explicit activation of protocol 1
#Protocol 2
Protocol 1
"""

Next, to the body of the test, we add code to create an instance of our parser class:

31
32
def test_sshd_config():
    sshd_config = SSHDConfig(context_wrap(SSHD_CONFIG_INPUT))

Finally we add our tests using the attributes that we want to be able to access in our rules. First a assumptions about the data:

  1. some keywords may be present more than once in the config file
  2. we want to access keywords in a case insensitive way
  3. order of the keywords matter
  4. we are not trying to validate the configuration file so we won’t parse the values or analyze sequence of keywords

Now here are the tests:

33
34
35
36
37
38
39
40
41
42
43
    assert sshd_config is not None
    assert 'Port' in sshd_config
    assert 'PORT' in sshd_config
    assert sshd_config['port'] == ['22', '22']
    assert 'ListenAddress' in sshd_config
    assert sshd_config['ListenAddress'] == ['10.110.0.1', '10.110.0.1']
    assert sshd_config['Protocol'] == ['1']
    assert 'AddressFamily' not in sshd_config
    ports = [l for l in sshd_config if l.keyword == 'Port']
    assert len(ports) == 2
    assert ports[0].value == '22'

Our tests assume that we want to know whether a particular keyword is present, regardless of character case used in the keyword, and we want to know the values of the keyword if present. We don’t want our rules to have to assume any particular case of characters in keywords so we can make it easy by performing case insensitive compares and assuming all lowercase for access. This may not always work, but in this example it is a safe assumption.

Parser Code

The subroutine parse_content is responsible for parsing the input data and storing the results in class attributes. You may choose the attributes that are necessary for your parser, there are no requirements to use specific names or types. Some general recommendations for parser class implementation are:

  • Choose attributes that make sense for use by actual rules, or how you anticipate rules to use the information. If rules need to iterate over the information then a list might be best, or if rules could access via keywords then dict might be better.
  • Choose attribute types that are not so complex they cannot be easily understood or serialized. Unless you know you need something complex keep it simple.
  • Use the @property decorator to create read-only getters and simplify access to information.

Now we need to implement the parser that will satisfy our tests.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
 from collections import namedtuple
 from insights import Parser, parser, get_active_lines
 from insights.core.spec_factory import SpecSet, simple_file
 import os


 class LocalSpecs(SpecSet):
     """ Datasources for collection from local host """
     conf_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'sshd_config')

     sshd_config = simple_file(conf_file)


 @parser(LocalSpecs.sshd_config)
 class SSHDConfig(Parser):

     KeyValue = namedtuple('KeyValue', ['keyword', 'value', 'kw_lower'])

     def parse_content(self, content):
         self.lines = []
         for line in get_active_lines(content):
             kw, val = line.split(None, 1)
             self.lines.append(self.KeyValue(kw.strip(), val.strip(), kw.lower().strip()))
         self.keywords = set([k.kw_lower for k in self.lines])

     def __contains__(self, keyword):
         return keyword.lower() in self.keywords

     def __iter__(self):
         for line in self.lines:
             yield line

     def __getitem__(self, keyword):
         kw = keyword.lower()
         if kw in self.keywords:
             return [kv.value for kv in self.lines if kv.kw_lower == kw]

We added an imports to our skeleton to utilize get_active_lines() and namedtuples. get_active_lines() is one of the many helper methods that you can find in insights/parsers/__init__.py, insights/core/__init__.py, and insights/util/__init__.py. get_active_lines() will remove all blank lines and comments from the input which simplifies your parsers parsing logic.

1
2
3
4
 from collections import namedtuple
 from insights import Parser, parser, get_active_lines
 from insights.core.spec_factory import SpecSet, simple_file
 import os

Since the sshd_config spec requires root access to access the /etc/ssh/sshd_config file we created a local SpecSet class called LocalSpecs` that will contain a local ``sshd_config spec that uses a local sshd_config file that does not require root access to read.

 6
 7
 8
 9
10
 class LocalSpecs(SpecSet):
     """ Datasources for collection from local host """
     conf_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'sshd_config')

     sshd_config = simple_file(conf_file)

To get the ssh_config file needed for the local sshd_config spec you can copy it from ~/work/insights-core-tutorials/insights_examples/parsers/sshd_config to the ~/work/insights-core-tutorials/mycomponents/parsers directory as shown below.

(env)[userone@hostone insights-core-tutorials]$ cp ./insights_examples/parsers/sshd_config ./mycomponents/parsers/

We can use namedtuples to help simplify access to the information we are storing in our parser by creating a namedtuple with the named attributes keyword, value, and kw_lower where kw_lower is the lowercase version of the keyword.

15
     KeyValue = namedtuple('KeyValue', ['keyword', 'value', 'kw_lower'])

In this particular parser we have chosen to store all lines (self.lines) as KeyValue named tuples since we don’t know what future rules might. We are also storing the set of lowercase keywords (self.keywords) to make it easier to determine if a keyword is present in the data. The values are left unparsed as we don’t know how a rule might need to evaluate them.

17
18
19
20
21
22
     def parse_content(self, content):
         self.lines = []
         for line in get_active_lines(content):
             kw, val = line.split(None, 1)
             self.lines.append(self.KeyValue(kw.strip(), val.strip(), kw.lower().strip()))
         self.keywords = set([k.kw_lower for k in self.lines])

Finally we implement some “dunder” methods to simplify use of the class. __contains__ enables the in operator for keyword checking. __iter__ enables iteration over the contents of self.lines. And __getitem__ enables access to all values of a keyword.

24
25
26
27
28
29
30
31
32
33
34
     def __contains__(self, keyword):
         return keyword.lower() in self.keywords

     def __iter__(self):
         for line in self.lines:
             yield line

     def __getitem__(self, keyword):
         kw = keyword.lower()
         if kw in self.keywords:
             return [kv.value for kv in self.lines if kv.kw_lower == kw]

We now have a complete implementation of our parser. It could certainly perform further analysis of the data and more methods for access, but it is better keep the parser simple in the beginning. Once it is in use by rules it will be easy to add functionality to the parser to allow simplification of the rules.

Parser Documentation

The last step to complete implementation of our parser is to create the documentation. The guidelines and examples for parser documentation is provided in the section Documentation Guidelines.

The following shows our completed parser including documentation.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
 """
 secure_shell - Files for configuration of `ssh`
 ===============================================

 The ``secure_shell`` module provides parsing for the ``sshd_config``
 file.  The ``SSHDConfig`` class implements the parsing and
 provides a ``list`` of all configuration lines present in
 the file.

 Sample content from the ``/etc/sshd/sshd_config`` file is::

     #       $OpenBSD: sshd_config,v 1.93 2014/01/10 05:59:19 djm Exp $

     Port 22
     #AddressFamily any
     ListenAddress 10.110.0.1
     Port 22
     ListenAddress 10.110.1.1
     #ListenAddress ::

     # The default requires explicit activation of protocol 1
     #Protocol 2
     Protocol 1

 Examples:
     >>> 'Port' in sshd_config
     True
     >>> 'PORT' in sshd_config  # items are stored case-insensitive
     True
     >>> 'AddressFamily' in sshd_config  # comments are ignored
     False
     >>> sshd_config['port']  # All value stored by keyword in lists
     ['22', '22']
     >>> sshd_config['Protocol']  # Single items have one list element
     ['1']
     >>> [line for line in sshd_config if line.keyword == 'Port']  # can be used as an iterator
     [KeyValue(keyword='Port', value='22', kw_lower='port'), KeyValue(keyword='Port', value='22', kw_lower='port')]
     >>> sshd_config.last('ListenAddress')  # Easy way of finding the current configuration for a single item
     '10.110.1.1'
 """
 from collections import namedtuple
 from insights import Parser, parser, get_active_lines
 from insights.specs import Specs
 from insights.core.spec_factory import SpecSet, simple_file
 import os


 class LocalSpecs(SpecSet):
     """ Datasources for collection from local host """
     conf_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'sshd_config')

     sshd_config = simple_file(conf_file)


 @parser(LocalSpecs.sshd_config)
 class SSHDConfig(Parser):
     """Parsing for ``sshd_config`` file.

     Attributes:
         lines (list): List of `KeyValue` namedtupules for each line in
             the configuration file.
         keywords (set): Set of keywords present in the configuration
             file, each keyword has been converted to lowercase.
     """

     KeyValue = namedtuple('KeyValue', ['keyword', 'value', 'kw_lower'])
     """namedtuple: Represent name value pair as a namedtuple with case ."""

     def parse_content(self, content):
         self.lines = []
         for line in get_active_lines(content):
             kw, val = (w.strip() for w in line.split(None, 1))
             self.lines.append(self.KeyValue(kw, val, kw.lower()))
         self.keywords = set([k.kw_lower for k in self.lines])

     def __contains__(self, keyword):
         return keyword.lower() in self.keywords

     def __iter__(self):
         for line in self.lines:
             yield line

     def __getitem__(self, keyword):
         kw = keyword.lower()
         if kw in self.keywords:
             return [kv.value for kv in self.lines if kv.kw_lower == kw]

     def last(self, keyword):
         """str: Returns the value of the last keyword found in config."""
         entries = self.__getitem__(keyword)
         if entries:
             return entries[-1]

Parser Testing

It is important that we ensure our tests will run successfully after any change to our parser. We are able to do that in two ways, first by using doctest to test our Examples section of the secure_shell module, and second by writing tests that can be tested automatically using pytest. Starting with adding import doctest our original code:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from mycomponents.parsers.secure_shell import SSHDConfig
from insights.parsers import secure_shell
from insights.tests import context_wrap
import doctest

SSHD_CONFIG_INPUT = """
#   $OpenBSD: sshd_config,v 1.93 2014/01/10 05:59:19 djm Exp $

Port 22
#AddressFamily any
ListenAddress 10.110.0.1
Port 22
ListenAddress 10.110.1.1
#ListenAddress ::

# The default requires explicit activation of protocol 1
#Protocol 2
Protocol 1
"""

def test_sshd_config():
    sshd_config = SSHDConfig(context_wrap(SSHD_CONFIG_INPUT))
    assert sshd_config is not None
    assert 'Port' in sshd_config
    assert 'PORT' in sshd_config
    assert sshd_config['port'] == ['22', '22']
    assert 'ListenAddress' in sshd_config
    assert sshd_config['ListenAddress'] == ['10.110.0.1', '10.110.1.1']
    assert sshd_config['Protocol'] == ['1']
    assert 'AddressFamily' not in sshd_config
    ports = [l for l in sshd_config if l.keyword == 'Port']
    assert len(ports) == 2
    assert ports[0].value == '22'

To test the documentation, we can then use doctest:

37
38
39
40
41
42
43
44
45
46
47
48
49
def test_sshd_documentation():
    """
    Here we test the examples in the documentation automatically using
    doctest.  We set up an environment which is similar to what a
    rule writer might see - a 'sshd_config' variable that has been
    passed in as a parameter to the rule declaration.  This saves doing
    this setup in the example code.
    """
    env = {
        'sshd_config': SSHDConfig(context_wrap(SSHD_CONFIG_INPUT)),
    }
    failed, total = doctest.testmod(secure_shell, globs=env)
    assert failed == 0

The environment setup allows us to ‘hide’ the set-up of the environment that normally provided to the rule, which is the context in which the example code is written. There’s no easy way to show the declaration of the rule, nor the parameter that is created with the parser object, but it’s good practice to supply an obvious name that rule writers might then use in their code.

The assert line here makes sure that any failures in the examples are detected by pytest. This will also include the testing output from doctest, showing where the code failed to evaluate or where the output differed from what was given.

Because this code essentially duplicates many of the things previously tested explicitly in the test_sshd_config function, we can remove some of those tests and only test the ‘corner cases’:

52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
SSHD_DOCS_EXAMPLE = '''
Port 22
Port 22
'''

def test_sshd_corner_cases():
    """
    Here we test any corner cases for behavior we expect to deal with
    in the parser but doesn't make a good example.
    """
    config = SSHDConfig(context_wrap(SSHD_DOCS_EXAMPLE))
    assert config.last('AddressFamily') is None
    assert config['AddressFamily'] is None
    ports = [l for l in config if l.keyword == 'Port']
    assert len(ports) == 2
    assert ports[0].value == '22'

The final version of our test now looks like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
from mycomponets.parsers.secure_shell import SSHDConfig
from insights.parsers import secure_shell
from insights.tests import context_wrap
import doctest

SSHD_CONFIG_INPUT = """
#   $OpenBSD: sshd_config,v 1.93 2014/01/10 05:59:19 djm Exp $

Port 22
#AddressFamily any
ListenAddress 10.110.0.1
Port 22
ListenAddress 10.110.1.1
#ListenAddress ::

# The default requires explicit activation of protocol 1
#Protocol 2
Protocol 1
"""

def test_sshd_config():
    sshd_config = SSHDConfig(context_wrap(SSHD_CONFIG_INPUT))
    assert sshd_config is not None
    assert 'Port' in sshd_config
    assert 'PORT' in sshd_config
    assert sshd_config['port'] == ['22', '22']
    assert 'ListenAddress' in sshd_config
    assert sshd_config['ListenAddress'] == ['10.110.0.1', '10.110.1.1']
    assert sshd_config['Protocol'] == ['1']
    assert 'AddressFamily' not in sshd_config
    ports = [l for l in sshd_config if l.keyword == 'Port']
    assert len(ports) == 2
    assert ports[0].value == '22'


def test_sshd_documentation():
    """
    Here we test the examples in the documentation automatically using
    doctest.  We set up an environment which is similar to what a
    rule writer might see - a 'sshd_config' variable that has been
    passed in as a parameter to the rule declaration.  This saves doing
    this setup in the example code.
    """
    env = {
        'sshd_config': SSHDConfig(context_wrap(SSHD_CONFIG_INPUT)),
    }
    failed, total = doctest.testmod(secure_shell, globs=env)
    assert failed == 0


SSHD_DOCS_EXAMPLE = '''
Port 22
Port 22
'''


def test_sshd_corner_cases():
    """
    Here we test any corner cases for behavior we expect to deal with
    in the parser but doesn't make a good example.
    """
    config = SSHDConfig(context_wrap(SSHD_DOCS_EXAMPLE))
    assert config.last('AddressFamily') is None
    assert config['AddressFamily'] is None
    ports = [l for l in config if l.keyword == 'Port']
    assert len(ports) == 2
    assert ports[0].value == '22'

To run pytest on just the completed secure_shell parser execute the following command:

(env)[userone@hostone ~]$ cd ~/work/insights-core-tutorials
(env)[userone@hostone insights-core-tutorials]$ pytest -k secure_shell

Once your tests all run successfully your parser is complete.