Rule Using Custom Parser

Overview

The purpose of a rule is to evaluate various facts and determine one or more results about a system. For our example rule we are interested in knowing whether a system with sshd is configured according to the following guidelines:

# Password based logins are disabled - only public key based logins are allowed.
AuthenticationMethods publickey

# LogLevel VERBOSE logs user's key fingerprint on login. Needed to have
# a clear audit track of which key was using to log in.
LogLevel VERBOSE

# Root login is not allowed for auditing reasons. This is because it's
# difficult to track which process belongs to which root user:
PermitRootLogin No

# Use only protocol 2 which is the default.  1 should not be listed
# Protocol 2

We also want to know what version of OpenSSH we are running if we find any problems.

You can find the complete implementation of the rule and test code in the directory insights-core-tutorials/insights_examples/rules.

The same development environment will be used that was setup at the beginning of the tutorial using the Preparing Your Development Environment section.

Secure Shell Server Rule

Rule Code

First we need to create a template rule file. It is recommended that you name the file based on the results it produces. Since we are looking at sshd security we will name the file mycomponents/rules/sshd_secure.py. Notice that the file is located in the rules subdirectory of your project:

(env)[userone@hostone mycomponents]$ touch rules/sshd_secure.py

Here’s the basic contents of the rule file:

 1from insights.core.plugins import make_fail, rule
 2from mycomponents.parsers.secure_shell import SSHDConfig
 3
 4ERROR_KEY = "SSHD_SECURE"
 5
 6
 7@rule(SSHDConfig)
 8def report(sshd_config):
 9    """
10    1. Evaluate config file facts
11    2. Evaluate version facts
12    """
13    if results_found:
14        return make_fail(ERROR_KEY, results=the_results)

First we import the insights-core methods make_fail() for creating a response and rule() to decorate our rule method so that it will be invoked by insights-core with the appropriate parser information. Then we import the parsers that provide the facts we need.

1from insights.core.plugins import make_fail, rule
2from mycomponents.parsers.secure_shell import SSHDConfig

Next we define a unique error key string, ERROR_KEY that will be collected by insights-core when our rule is executed, and provided in the results for all rules. This string must be unique among all of your rules, or the last rule to execute will overwrite any results from other rules with the same key.

4ERROR_KEY = "SSHD_SECURE"

The @rule() decorator is used to mark the rule method that will be invoked by insights-core. Arguments to @rule() consist of the parser and combiner objects that are necessary for rule processing. Each object may be either required, at least one from a list, or optional. All required objects must be available or the rule will not be called. One or more objects from the at least one list must be available or the rule will not be called. Zero or more objects can be available from the optional list.

In the rule decorator required objects are listed first, next are the “at least one” as a list argument, and finally the optional object as a list using the keyword optional. For example if the a rule has the following input requirements:

Criteria

@rule Decorator Arguments

Requires

SSHDConfig, InstalledRpms

At Least One

[ChkConfig, UnitFiles]

Optional

optional=[IPTables, IpAddr]

The decorator for the rule and the rule signature will look like this:

@rule(SSHDConfig, InstalledRpms, [ChkConfig, UnitFiles], optional=[IPTables, IpAddr])
def report(sshd_config, installed_rpms, chk_config, unit_files, ip_tables, ip_addr):
    # sshd_config and installed_rpms will always be present
    # at least one of chk_config and unit_files will be present
    # ip_tables and ip_addr will be present if data is available
    # arguments will be None if data is not available

Currently our rule requires one parser SSHDConfig. We will add a requirement to obtain facts about installed RPMs in the final code.

7@rule(SSHDConfig)

The name of our rule method is report, but the name may be any valid method name. The purpose of the method is to evaluate the parser facts stored in the parser object sshd_config. If any results are found in the evaluation then a response is created with the ERROR_KEY and any data that you want to be associated with the results are included in the response. This data can be viewed in the results made available to a customer in the Red Hat Insights web interface. You may use zero or more named arguments to provide the data to make_fail. You should use meaningful argument names as it helps in understanding of the results.

 8def report(sshd_config):
 9    """
10    1. Evaluate config file facts
11    2. Evaluate version facts
12    """
13    if results_found:
14        return make_fail(ERROR_KEY, results=the_results)

In order to perform the evaluation we need the facts for sshd_config and for the OpenSSH version. The SSHDConfig parser we developed will provide the facts for sshd_config and we can use another parser, InstalledRpms to help us determine facts about installed software.

Here is our updated rule with check for the configuration options and the software version:

 1from insights.core.plugins import make_fail, rule
 2from mycomponents.parsers.secure_shell import SSHDConfig
 3from insights.parsers.installed_rpms import InstalledRpms
 4
 5ERROR_KEY = "SSHD_SECURE"
 6
 7
 8@rule(InstalledRpms, SSHDConfig)
 9def report(installed_rpms, sshd_config):
10    errors = {}
11
12    auth_method = sshd_config.last('AuthenticationMethods')
13    if auth_method:
14        if auth_method.lower() != 'publickey':
15            errors['AuthenticationMethods'] = auth_method
16    else:
17        errors['AuthenticationMethods'] = 'default'
18
19    log_level = sshd_config.last('LogLevel')
20    if log_level:
21        if log_level.lower() != 'verbose':
22            errors['LogLevel'] = log_level
23    else:
24        errors['LogLevel'] = 'default'
25
26    permit_root = sshd_config.last('PermitRootLogin')
27    if permit_root:
28        if permit_root.lower() != 'no':
29            errors['PermitRootLogin'] = permit_root
30    else:
31        errors['PermitRootLogin'] = 'default'
32
33    # Default Protocol is 2
34    protocol = sshd_config.last('Protocol')
35    if protocol:
36        if protocol.lower() != '2':
37            errors['Protocol'] = protocol
38
39    if errors:
40        openssh_version = installed_rpms.get_max('openssh')
41        return make_fail(ERROR_KEY, errors=errors, openssh=openssh_version.package)

This rules code implements the checking of the four configuration values AuthenticationMethods, LogLevel, PermitRootLogin, and Protocol, and returns any errors found using make_fail in the return. Also, if errors are found, the InstalledRpms parser facts are queried to determine the version of OpenSSH installed and that value is also returned. If no values are found then an implicit None is returned.

Now that we have the logic to check all of the rule conditions it is possible to refactor the rule to make the condition checks more obvious. This is sometimes helpful in testing your rule as will be discussed below. Here is the refactored rule:

 1from insights.core.plugins import make_fail, rule
 2from insights.parsers.secure_shell import SSHDConfig
 3from insights.parsers.installed_rpms import InstalledRpms
 4
 5ERROR_KEY = "SSHD_SECURE"
 6
 7
 8def check_auth_method(sshd_config, errors):
 9    auth_method = sshd_config.last('AuthenticationMethods')
10    if auth_method:
11        if auth_method.lower() != 'publickey':
12            errors['AuthenticationMethods'] = auth_method
13    else:
14        errors['AuthenticationMethods'] = 'default'
15    return errors
16
17
18def check_log_level(sshd_config, errors):
19    log_level = sshd_config.last('LogLevel')
20    if log_level:
21        if log_level.lower() != 'verbose':
22            errors['LogLevel'] = log_level
23    else:
24        errors['LogLevel'] = 'default'
25    return errors
26
27
28def check_permit_root(sshd_config, errors):
29    permit_root = sshd_config.last('PermitRootLogin')
30    if permit_root:
31        if permit_root.lower() != 'no':
32            errors['PermitRootLogin'] = permit_root
33    else:
34        errors['PermitRootLogin'] = 'default'
35    return errors
36
37
38def check_protocol(sshd_config, errors):
39    # Default Protocol is 2 if not specified
40    protocol = sshd_config.last('Protocol')
41    if protocol:
42        if protocol.lower() != '2':
43            errors['Protocol'] = protocol
44    return errors
45
46
47@rule(InstalledRpms, SSHDConfig)
48def report(installed_rpms, sshd_config):
49    errors = {}
50    errors = check_auth_method(sshd_config, errors)
51    errors = check_log_level(sshd_config, errors)
52    errors = check_permit_root(sshd_config, errors)
53    errors = check_protocol(sshd_config, errors)
54
55    if errors:
56        openssh_version = installed_rpms.get_max('openssh')
57        return make_fail(ERROR_KEY, errors=errors, openssh=openssh_version.package)

To increase the readability of the rule output and possibly make the transition to insights content format smoother, add Jinja2 formatting to the sshd_secure rule. Here is the refactored code with the additional Jinja2 formatting:

 1from insights.core.plugins import make_fail, rule
 2from insights.parsers.secure_shell import SSHDConfig
 3from insights.parsers.installed_rpms import InstalledRpms
 4
 5ERROR_KEY = "SSHD_SECURE"
 6
 7# Jinja2 template displayed for make_response results
 8CONTENT =  ERROR_KEY + """
 9:{
10                 {% for key, value in errors.items() -%}
11                     {{key}}: {{value}}
12                 {% endfor -%} }
13OPEN_SSH_PACKAGE: {{openssh}}""".strip()
14
15
16def check_auth_method(sshd_config, errors):
17    auth_method = sshd_config.last('AuthenticationMethods')
18    if auth_method:
19        if auth_method.lower() != 'publickey':
20            errors['AuthenticationMethods'] = auth_method
21    else:
22        errors['AuthenticationMethods'] = 'default'
23    return errors
24
25
26def check_log_level(sshd_config, errors):
27    log_level = sshd_config.last('LogLevel')
28    if log_level:
29        if log_level.lower() != 'verbose':
30            errors['LogLevel'] = log_level
31    else:
32        errors['LogLevel'] = 'default'
33    return errors
34
35
36def check_permit_root(sshd_config, errors):
37    permit_root = sshd_config.last('PermitRootLogin')
38    if permit_root:
39        if permit_root.lower() != 'no':
40            errors['PermitRootLogin'] = permit_root
41    else:
42        errors['PermitRootLogin'] = 'default'
43    return errors
44
45
46def check_protocol(sshd_config, errors):
47    # Default Protocol is 2 if not specified
48    protocol = sshd_config.last('Protocol')
49    if protocol:
50        if protocol.lower() != '2':
51            errors['Protocol'] = protocol
52    return errors
53
54
55@rule(InstalledRpms, SSHDConfig)
56def report(installed_rpms, sshd_config):
57    errors = {}
58    errors = check_auth_method(sshd_config, errors)
59    errors = check_log_level(sshd_config, errors)
60    errors = check_permit_root(sshd_config, errors)
61    errors = check_protocol(sshd_config, errors)
62
63    if errors:
64        openssh_version = installed_rpms.get_max('openssh')
65        return make_fail(ERROR_KEY, errors=errors, openssh=openssh_version.package)

Rule Testing

Testing is an important aspect of rule development and it helps ensure accurate rule logic. There are generally two types of testing to be performed on rules, unit and integration testing. If rule logic is divided among multiple methods then unit tests should be written to test the methods. If there is only one method then unit tests may not be necessary. Integration tests are necessary to test the rule in a simulated insights-core environment. This will be easier to understand by viewing the test code:

 1from mycomponents.rules import sshd_secure
 2from insights.tests import InputData, archive_provider, context_wrap
 3from insights.core.plugins import make_fail
 4from insights.specs import Specs
 5# The following imports are not necessary for integration tests
 6from mycomponents.parsers.secure_shell import SSHDConfig
 7
 8OPENSSH_RPM = """
 9openssh-6.6.1p1-31.el7.x86_64
10openssh-6.5.1p1-31.el7.x86_64
11""".strip()
12
13EXPECTED_OPENSSH = "openssh-6.6.1p1-31.el7"
14
15GOOD_CONFIG = """
16AuthenticationMethods publickey
17LogLevel VERBOSE
18PermitRootLogin No
19# Protocol 2
20""".strip()
21
22BAD_CONFIG = """
23AuthenticationMethods badkey
24LogLevel normal
25PermitRootLogin Yes
26Protocol 1
27""".strip()
28
29DEFAULT_CONFIG = """
30# All default config values
31""".strip()
32
33
34
35@archive_provider(sshd_secure.report)
36def integration_tests():
37    """
38    InputData acts as the data source for the parsers
39    so that they may execute and then be used as input
40    to the rule.  So this is essentially an end-to-end
41    test of the component chain.
42    """
43    input_data = InputData("GOOD_CONFIG")
44    input_data.add(Specs.sshd_config, GOOD_CONFIG)
45    input_data.add(Specs.installed_rpms, OPENSSH_RPM)
46    yield input_data, None
47
48    input_data = InputData("BAD_CONFIG")
49    input_data.add(Specs.sshd_config, BAD_CONFIG)
50    input_data.add(Specs.installed_rpms, OPENSSH_RPM)
51    errors = {
52        'AuthenticationMethods': 'badkey',
53        'LogLevel': 'normal',
54        'PermitRootLogin': 'Yes',
55        'Protocol': '1'
56    }
57    expected = make_fail(sshd_secure.ERROR_KEY,
58                             errors=errors,
59                             openssh=EXPECTED_OPENSSH)
60    yield input_data, expected
61
62    input_data = InputData("DEFAULT_CONFIG")
63    input_data.add(Specs.sshd_config, DEFAULT_CONFIG)
64    input_data.add(Specs.installed_rpms, OPENSSH_RPM)
65    errors = {
66        'AuthenticationMethods': 'default',
67        'LogLevel': 'default',
68        'PermitRootLogin': 'default'
69    }
70    expected = make_fail(sshd_secure.ERROR_KEY,
71                             errors=errors,
72                             openssh=EXPECTED_OPENSSH)
73    yield input_data, expected

Test Data

Data utilized for all tests is defined in the test module. In this case we will use an OpenSSH RPM version that is present in RHEL 7.2, OPENSSH_RPM and three configuration files for sshd_config. GOOD_CONFIG has all of the values that we are looking for and should not return any error results. BAD_CONFIG has all bad values so it should return all error results. And DEFAULT_CONFIG has no values present so it should return errors for all values except Protocol which defaults to the correct value.

 8OPENSSH_RPM = """
 9openssh-6.6.1p1-31.el7.x86_64
10openssh-6.5.1p1-31.el7.x86_64
11""".strip()
12
13EXPECTED_OPENSSH = "openssh-6.6.1p1-31.el7"
14
15GOOD_CONFIG = """
16AuthenticationMethods publickey
17LogLevel VERBOSE
18PermitRootLogin No
19# Protocol 2
20""".strip()
21
22BAD_CONFIG = """
23AuthenticationMethods badkey
24LogLevel normal
25PermitRootLogin Yes
26Protocol 1
27""".strip()
28
29DEFAULT_CONFIG = """
30# All default config values
31""".strip()

Integration Tests

Integration tests are performed within the insights-core framework. The InputData class is used to define the raw data that we want to be present, and the framework creates an archive file to be input to the insights-core framework so that the parsers will be invoked, and then the rules will be invoked. You need to create InputData objects with all information that is necessary for parsers required by your rules. If input data is not present then parsers will not be executed, and if your rule requires a missing parser it will not be executed.

To create your integration tests you must first create a method that does not begin with test_ and decorate that method with @archive_provider(rule_name) having an argument that is your rule function name. Typically we name the method integration_tests.

57@archive_provider(sshd_secure.report)
58def integration_tests():

Next we create an InputData object and it is useful to provide a name argument to the constructor. When you execute integration tests, that name will show up in the results and make it easier to debug if you have any problems. Next you add your test inputs to the InputData object that will be used to create the test archive. You add the data with the add method and identify the source of the data using the data source spec that is associated with the parser such as Specs.sshd_config. Once all of the data has been added, a yield statement provides the input data and expected results to the archive_provider to run the test. In this particular test case we provided all good data so we did not expect any results None.

59    input_data = InputData("GOOD_CONFIG")
60    input_data.add(Specs.sshd_config, GOOD_CONFIG)
61    input_data.add(Specs.installed-rpms, OPENSSH_RPM)
62    yield input_data, None

Note

If your input data has a path that is significant to the interpretation of the data, such as /etc/sysconfig/network-scripts/ifcfg-eth0 where there may be multiple ifcfg scripts, you’ll need to add the path as well. For example:

input_data.add(Specs.ifcfg,
               IFCFG_ETH0,
               path="etc/sysconfig/network-scripts/ifcfg-eth0")
input_data.add(Specs.ifcfg,
               IFCFG_ETH1,
               path="etc/sysconfig/network-scripts/ifcfg-eth1")

In the second test case we are using bad input data so we have to also provide the errors that we expect our rule to return to the framework. The expected results are in the same format that we create the return value in ssh_secure.report.

64    input_data = InputData(name="BAD_CONFIG")
65    input_data.add(Specs.sshd_config, BAD_CONFIG)
66    input_data.add(Specs.installed-rpms, OPENSSH_RPM)
67    errors = {
68        'AuthenticationMethods': 'badkey',
69        'LogLevel': 'normal',
70        'PermitRootLogin': 'Yes',
71        'Protocol': '1'
72    }
73    expected = make_fail(sshd_secure.ERROR_KEY,
74                             errors=errors,
75                             openssh=EXPECTED_OPENSSH)
76    yield input_data, expected

Running the Tests

We execute these tests by moving to the root directory of our rules project, ensuring that our virtual environment is active, and running pytest:

(env)[userone@hostone mycomponents]$ pytest -k rules
====================== test session starts =============================================
platform linux -- Python 3.6.6, pytest-4.0.2, py-1.7.0, pluggy-0.8.0
rootdir: /home/userone/work/insights-core-tutorials, inifile: setup.cfg
plugins: cov-2.4.0
collected 22 items / 8 deselected / 14 selected

mycomponents/tests/integration.py .....                                                                                                                                                    [ 83%]
=================== 14 passed, 8 deselected in 0.30 seconds ============================

You may also want to run the rule using insights-run. This will give you a better idea of what the output would be from the rule. We execute this test by moving to the root directory (insights-core-tutorials), ensuring that our virtual environment is active, and running insight-run -p rules/sshd_secure.py:

(insights-core)[userone@hostone mycomponents]$ insights-run -p rules/sshd_secure.py
---------
Progress:
---------
F

--------------
Rules Executed
--------------
[FAIL] rules.sshd_secure.report
------------------------
SSHD_SECURE:
    errors : {'AuthenticationMethods': 'default',
              'LogLevel': 'default',
              'PermitRootLogin': 'default',
              'Protocol': '1'}
    openssh: 'openssh-7.7p1-6.fc28'



----------------------
Rule Execution Summary
----------------------
Missing Deps: 0
Passed      : 0
Fingerprint : 0
Failed      : 1
Metadata    : 0
Metadata Key: 0
Exceptions  : 0

Note: If you have already built your parser in the mycomponents/parsers directory then you will see the following, otherwise you would only see tests for rules…

If any tests fail you can use the following pytest -s -v --appdebug options to help get additional information. If you want to limit which test run you can also use the -k test_filter_string option.