How to Extend Ansible Through Plugins

January 5, 2017 by James Cammarata


 

Did you know a large portion of Ansible’s functionality comes from the Ansible plugin system? These important pieces of code augment Ansible’s core functionality such as parsing and loading inventory and Playbooks, running Playbooks and reading the results. Essentially, Ansible uses plugins to extend what the system is doing under the hood.

In this blog, I’ll review each of these plugins and offer a high-level overview on how to write your own plugin to extend Ansible functionality.

Action Plugins

One of the core critical plugins used by Ansible are action plugins. Anytime you run a module, Ansible first runs an action plugin.

Action plugins are a layer between the executor engine and the module and allow for controller-side actions to be taken before the module is executed. A good example of this is the template module. If you look at template.py in the modules directory, it’s basically a Python stub with documentation strings, everything is done by the action plugin. The template action plugin itself creates the template file locally as a temporary file, and then uses the copy or file modules to push it out to the target system.

If Ansible finds an action plugin with the same name as the module, that action plugin is used, otherwise the 'normal' action plugin is used. Tasks which use 'async' have a special action plugin, which is used to launch the task using the 'async_wrapper' module.

The following code is the entirety of the 'normal' action plugin.

# code from plugins/action/normal.py
# (comments and most blank lines have been removed for brevity)

from __future__ import (absolute_import, division, print_function)
__metaclass__ = type

from ansible.plugins.action import ActionBase
from ansible.utils.vars import merge_hash


class ActionModule(ActionBase):
    def run(self, tmp=None, task_vars=None):
        if task_vars is None:
            task_vars = dict()
        results = super(ActionModule, self).run(tmp, task_vars)
        # remove as modules might hide due to nolog
        del results['invocation']['module_args']
        results = merge_hash(
            results,
            self._execute_module(tmp=tmp, task_vars=task_vars),
        )
        for field in ('_ansible_notify',):
            if field in results:
                results.pop(field)
        return results
        

New action plugins usually only need to subclass ActionBase and override the 'run()' method. Modules are executed remotely by the '_execute_module()' method, which can also accept other parameters to allow you to run more than one module remotely to create some complex actions. For example, the 'template' action uses the 'copy' and 'file' modules to do the real work of copying the templated file to the remote system:

# from plugins/action/template.py
# L175-184 on the devel branch at the time of writing

        # run the copy module
        new_module_args.update(
            dict(
                src=xfered,
                dest=dest,
                original_basename=os.path.basename(source),
                follow=True,
            ),
        )
        result.update(
            self._execute_module(
                module_name='copy',
                module_args=new_module_args,
                task_vars=task_vars,
                tmp=tmp,
                delete_remote_tmp=False,
            )
        )

The above code is run after a temporary file is generated using the templating engine, which all occurs on the Ansible controller side. Using other modules in this manner allows us to avoid duplicating code and is very common in modules related to file operations.

Callback Plugins

One of the more heavily developed plugins, callbacks provide a way to react to events which occur during the execution of Playbooks. Ansible can load multiple callbacks, however, we differentiate between callbacks which send output to the screen and those that don’t. This allows us to ensure output to the screen is legible.

Before Ansible 2.0, users had to copy plugins to the callback plugin directory in order to use them and all plugins in that directory were executed. This made things a bit difficult if a user wished to change which callback plugins were run between different Playbook runs. In 2.0 we changed that to a white list, configurable in ansible.cfg or via an environment varriable (ANSIBLE_CALLBACK_WHITELIST). Also in Ansible 2.0, we added a new set of events to allow callbacks to more easily be used between 1.9.x and 2.0+:

  • on_playbook_start
  • V2_on_playbook_start

Below is an example of a callback plugin:

# from plugins/callback/slack.py

from ansible.constants import mk_boolean
from ansible.module_utils.urls import open_url
from ansible.plugins.callback import CallbackBase

try:
    import prettytable
    HAS_PRETTYTABLE = True
except ImportError:
    HAS_PRETTYTABLE = False

class CallbackModule(CallbackBase):
    """This is an ansible callback plugin that sends status
    updates to a Slack channel during playbook execution.

    This plugin makes use of the following environment variables:
        SLACK_WEBHOOK_URL (required): Slack Webhook URL
        SLACK_CHANNEL     (optional): Slack room to post in. Default: #ansible
        SLACK_USERNAME    (optional): Username to post as. Default: ansible
        SLACK_INVOCATION  (optional): Show command line invocation
                                      details. Default: False

    Requires:
        prettytable

    """
    CALLBACK_VERSION = 2.0
    CALLBACK_TYPE = 'notification'
    CALLBACK_NAME = 'slack'
    CALLBACK_NEEDS_WHITELIST = True

...

    def send_msg(self, attachments):
        payload = {
            'channel': self.channel,
            'username': self.username,
            'attachments': attachments,
            'parse': 'none',
            'icon_url': ('http://www.ansible.com/hs-fs/hub/330046/file-449187601-png/ansible_badge.png'),
        }

        data = json.dumps(payload)
        self._display.debug(data)
        self._display.debug(self.webhook_url)
        try:
            response = open_url(self.webhook_url, data=data)
            return response.read()
        except Exception as e:
            self._display.warning('Could not submit message to Slack: %s' %
                                  str(e))

    def v2_playbook_on_play_start(self, play):
        """Display Play start messages"""

        name = play.name or 'Play name not specified (%s)' % play._uuid
        msg = '*Starting play* (_%s_)\n\n*%s*' % (self.guid, name)
        attachments = [
            {
                'fallback': msg,
                'text': msg,
                'color': 'warning',
                'mrkdwn_in': ['text', 'fallback', 'fields'],
            }
        ]
        self.send_msg(attachments=attachments)

Callback plugins have very many entry points, which are triggered at various points in the executor engine. For a full listing, see the stubbed methods defined in CallbackBase (located in the 'plugins/callback/__init__.py' file).

Connection Plugins

Connection plugins, alongside action and callback plugins, are probably the most important plugins since they are used during the execution of every task. Connection plugins provide the transport layer between the Ansible controller and managed hosts. The simple API includes five methods:

  • connect
  • exec_command
  • put_file
  • get_file
  • disconnect

These make it very easy to write connection plugins to connect to hosts via a wide range of methods. Whether it’s SSH, LXC, chroot, Docker, and so on, it’s easy to write something for Ansible to take control of.

The following code snippet shows some code from the 'chroot' connection plugin:

# from plugins/connection/chroot.py

...

class Connection(ConnectionBase):
    ''' Local chroot based connections '''

    transport = 'chroot'
    has_pipelining = True
    # su currently has an undiagnosed issue with calculating the file
    # checksums (so copy, for instance, doesn't work right)
    # Have to look into that before re-enabling this
    become_methods = frozenset(C.BECOME_METHODS).difference(('su',))

...

    def _buffered_exec_command(self, cmd, stdin=subprocess.PIPE):
        ''' run a command on the chroot.  This is only needed for implementing
        put_file() get_file() so that we don't have to read the whole file
        into memory.

        compared to exec_command() it looses some niceties like being able to
        return the process's exit code immediately.
        '''
        executable = C.DEFAULT_EXECUTABLE.split()[0] \
                     if C.DEFAULT_EXECUTABLE else '/bin/sh'
        local_cmd = [self.chroot_cmd, self.chroot, executable, '-c', cmd]

        display.vvv("EXEC %s" % (local_cmd), host=self.chroot)
        local_cmd = \
          [to_bytes(i, errors='surrogate_or_strict') for i in local_cmd]
        p = subprocess.Popen(local_cmd, shell=False, stdin=stdin,
                stdout=subprocess.PIPE, stderr=subprocess.PIPE)

        return p

    def exec_command(self, cmd, in_data=None, sudoable=False):
        ''' run a command on the chroot '''
        super(Connection, self).exec_command(
            cmd,
            in_data=in_data,
            sudoable=sudoable,
        )

        p = self._buffered_exec_command(cmd)

        stdout, stderr = p.communicate(in_data)
        return (p.returncode, stdout, stderr)

...

The 'put_file' and 'fetch_file' methods also make use of the '_buffered_exec_command()' method shown above to move files in and out of the chroot. But to Ansible, this looks just like a remote host.

Strategy Plugins

Strategy plugins are a new addition to Ansible 2.0 that allow users to execute tasks on hosts differently than the way Ansible traditionally did in 1.9.x and earlier. Ansible currently includes three strategies:

  • Linear (classic 1.x Ansible) - All hosts in the inventory must complete each task before any move on to running the next task. Some users objected to the linear strategy because it slows the system, so we introduced the “free” strategy...
  • Free - This allows each host to execute the tasks in its list as quickly as possible without waiting for other hosts to complete the same task.
  • Debug - This is one of the coolest things that has come from the community, in my opinion. The debug strategy turns Ansible into an interactive debugger.

The following code shows the 'debug' strategy, which turns the 'linear' strategy into an interactive debugger:

from ansible.plugins.strategy.linear import StrategyModule as LinearStrategyModule

...

class NextAction(object):
    """ The next action after an interpreter's exit. """
    REDO = 1
    CONTINUE = 2
    EXIT = 3
    def __init__(self, result=EXIT):
        self.result = result

class StrategyModule(LinearStrategyModule):
    def __init__(self, tqm):
        self.curr_tqm = tqm
        super(StrategyModule, self).__init__(tqm)

    def _queue_task(self, host, task, task_vars, play_context):
        self.curr_host = host
        self.curr_task = task
        self.curr_task_vars = task_vars
        self.curr_play_context = play_context

        super(StrategyModule, self)._queue_task(
            host,
            task,
            task_vars,
            play_context,
        )

    def _process_pending_results(self,
        iterator,
        one_pass=False,
        max_passes=None):
        if not hasattr(self, "curr_host"):
            return super(
                StrategyModule,
                self
            )._process_pending_results(iterator, one_pass, max_passes)

        prev_host_state = iterator.get_host_state(self.curr_host)
        results = super(
            StrategyModule,
            self)._process_pending_results(iterator, one_pass)

        while self._need_debug(results):
            next_action = NextAction()
            dbg = Debugger(self, results, next_action)
            dbg.cmdloop()

            if next_action.result == NextAction.REDO:
                # rollback host state
                self.curr_tqm.clear_failed_hosts()
                iterator._host_states[self.curr_host.name] = prev_host_state
                if reduce(lambda total, res : res.is_failed() or \
                    total, results, False):
                    self._tqm._stats.failures[self.curr_host.name] -= 1
                elif reduce(lambda total, res : res.is_unreachable() or \
                    total, results, False):
                    self._tqm._stats.dark[self.curr_host.name] -= 1

                # redo
                super(StrategyModule, self)._queue_task(
                    self.curr_host,
                    self.curr_task,
                    self.curr_task_vars,
                    self.curr_play_context
                )
                results = super(
                    StrategyModule,
                    self
                )._process_pending_results(iterator, one_pass)
            elif next_action.result == NextAction.CONTINUE:
                break
            elif next_action.result == NextAction.EXIT:
                exit(1)

        return results

    def _need_debug(self, results):
        return reduce(lambda total, res : res.is_failed() or \
               res.is_unreachable() or total, results, False)

class Debugger(cmd.Cmd):
    prompt = '(debug) '  # debugger
    prompt_continuous = '> '  # multiple lines

    def __init__(self, strategy_module, results, next_action):
        # cmd.Cmd is old-style class
        cmd.Cmd.__init__(self)

        self.intro = "Debugger invoked"
        self.scope = {}
        self.scope['task'] = strategy_module.curr_task
        self.scope['vars'] = strategy_module.curr_task_vars
        self.scope['host'] = strategy_module.curr_host
        self.scope['result'] = results[0]._result
        self.scope['results'] = results  # for debug of this debugger
        self.next_action = next_action

...

As noted above, the 'debug' strategy simply sub-classes the 'linear' strategy class and overrides two methods defined in StrategyBase- 'queue_task' (which handles starting a worker to run the task) and '_process_pending_results()' (which reads results back from workers). The bulk of the work is done in the later, which invokes the interactive Debugger class when a failed task result is encountered and allows the user to do things like retry the task or modify internal Ansible variables and state.

Using the 'debug' strategy is very useful when writing new Playbooks and roles and can drastically reduce the number of times you have to re-run things while you’re getting your procedures straightened out.

Lookup Plugins

These are used mainly by the template engine inside Ansible. They’re used in two ways.

First, in a function syntax to load external information:

  • {{lookup(`pipe’, `/usr/bin/whoami’)}}
  • {{lookup(`etcd’, `somekey’)}} – this allows you to fetch a key out of an NCD store.

Second, lookup plugins are also the source of with loops (with_items loads the items.py lookup plugin).

A couple of caveats to bear in mind about lookup plugins are that they always execute on the Ansible controller, not on a remote system. Furthermore, they are always expected to return a list of items, because of their potential use with loops.

The following code shows the 'pipe' lookup as shown above, which allows the user to fetch the output from a command-line program and store it in the variable:

# from plugins/lookup/pipe.py

from __future__ import (absolute_import, division, print_function)
__metaclass__ = type

import subprocess

from ansible.errors import AnsibleError
from ansible.plugins.lookup import LookupBase

class LookupModule(LookupBase):
    def run(self, terms, variables, **kwargs):
        ret = []
        for term in terms:
            term = str(term)
            p = subprocess.Popen(
                term,
                cwd=self._loader.get_basedir(),
                shell=True,
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE
            )
            (stdout, stderr) = p.communicate()
            if p.returncode == 0:
                ret.append(stdout.decode("utf-8").rstrip())
            else:
                raise AnsibleError(
                    "lookup_plugin.pipe(%s) returned %d" % \
                    (term, p.returncode)
                )
        return ret
        

Filter and Test Plugins

Filter and test plugin classes extend the Jinja2 templating system Ansible uses for variables. They allow you to do some cool things with data transformation and testing of the value or type of variable being run through the Jinja2 engine.

For example, filters are used to transform data:

  • {{foo|int}}
  • {{foo|default('some value')}}

And tests are used to validate data:

  • {{foo is defined}}

The following code is an example of a filter which allows you to query JSON data using the jmespath query language:

# from plugins/filter/json_query.py

from ansible.errors import AnsibleError
from ansible.plugins.lookup import LookupBase
from ansible.utils.listify import listify_lookup_plugin_terms

try:
    import jmespath
    HAS_LIB = True
except ImportError:
    HAS_LIB = False

def json_query(data, expr):
    if not HAS_LIB:
        raise AnsibleError(
            'You need to install "jmespath" prior to running '
            'json_query filter'
        )
    return jmespath.search(expr, data)

class FilterModule(object):
    ''' Query filter '''
    def filters(self):
        return {
            'json_query': json_query
        }

Note that unlike other plugins so far, this one does not have a Base class (and instead uses 'object'). This is due to the fact that filters and tests (as noted above) are really part of Jinja2, which we're extending here.

Cache Plugins

Part of the templating variable system, cache plugins are used to store gathered facts outside of local memory. This is important because, by default, Ansible uses the in-memory cache plugin which can cause problems if your process involves running several individual Playbooks and you need fact data. In such instances, each of those individual runs would need to regather those facts because they only reside in memory as long as a Playbook is running. In addition to the in-memory default, Ansible includes cache plugins to store fact data in memcached and Redis, or even just a flat JSON file.

The following code shows the 'memory' cache plugin (which is the default as noted above):

# from plugins/cache/memory.py

from __future__ import (absolute_import, division, print_function)
__metaclass__ = type

from ansible.plugins.cache.base import BaseCacheModule

class CacheModule(BaseCacheModule):
    def __init__(self, *args, **kwargs):
        self._cache = {}
    def get(self, key):
        return self._cache.get(key)
    def set(self, key, value):
        self._cache[key] = value
    def keys(self):
        return self._cache.keys()
    def contains(self, key):
        return key in self._cache
    def delete(self, key):
        del self._cache[key]
    def flush(self):
        self._cache = {}
    def copy(self):
        return self._cache.copy()

For those who are familiar with Python internals, you may notice that cache plugins pretty much implement a dictionary interface.

Shell Plugins

Shell plugins are used to properly format commands for remote execution (quoting, escaping, logic, etc.). They were originally written to simplify the handling of ssh vs. winrm execution, but more plugins have been added for other shells (csh, fish, dash, to name a few).

Each connection plugin has a default shell plugin, for instance the winrm connection defaults to PowerShell.

The following code shows the 'csh' shell plugin:

from __future__ import (absolute_import, division, print_function)
__metaclass__ = type

from ansible.plugins.shell import ShellBase

class ShellModule(ShellBase):

    # Common shell filenames that this plugin handles
    COMPATIBLE_SHELLS = frozenset(('csh', 'tcsh'))
    # Family of shells this has.  Must match the filename without extension
    SHELL_FAMILY = 'csh'

    # How to end lines in a python script one-liner
    _SHELL_EMBEDDED_PY_EOL = '\\\n'
    _SHELL_REDIRECT_ALLNULL = '>& /dev/null'
    _SHELL_AND = '&&'
    _SHELL_OR = '||'
    _SHELL_SUB_LEFT = '"`'
    _SHELL_SUB_RIGHT = '`"'
    _SHELL_GROUP_LEFT = '('
    _SHELL_GROUP_RIGHT = ')'

    def env_prefix(self, **kwargs):
        return 'env %s' % super(ShellModule, self).env_prefix(**kwargs)

Mainly, shell plugins override class-level variables. A few methods are also available for overriding, such as the 'env_prefix()' method above (which controls how environment variables are formatted for the given shell).

Vars Plugins

Vars plugins are used to bring in additional host and group data during inventory parsing time. Oddly, Ansible doesn’t ship any vars plugins ourselves, this is something that only exists outside of Ansible. As such, we have not defined a base class. However, we are looking at revamping inventory which may remove the need for vars plugins.

When Should You Write a Plugin

If Ansible isn’t doing what you need it to do, write a plugin to make it do so! In most cases, it’s very easy to add new plugins to extend the power of Ansible.

When writing a plugin, always use the provided base classes. The Ansible plugin loader (the main class responsible for finding and loading files with a specific plugin class) will ignore your plugin class if it doesn’t have the proper base class as a parent object. The great thing about base classes is that they provide a ton of pre-written methods so you don’t have to reinvent the wheel (or cargo-cult a bunch of code).

Testing and debugging your plugins is easy too. Writing unit testing for plugins is simplified thanks to Ansible’s plugin API which makes it easy to load those objects.

Writing unit tests for certain plugin types such as connection and action plugins may be more difficult, due to the fact that they take a larger set of complex internal data structures. As a result, mocking those is a bit more involved. For most users, the easiest way is to write an integration test, i.e. test your plugin via an Ansible Playbook itself and use the 'assert' module to validate output. The code below provides a quick example of this:


- debug:
    msg: “{{lookup(‘my_test_lookup’)}}”
  register: result

- assert:
    that:
      - "result.msg == ['the output that I expect']"

Ready for more?

Check out my deep dive session from AnsibleFest Brooklyn 2016 where I walk through each of the plugin examples from this blog post:
 

Share:

Topics:
Getting Started


 

James Cammarata

James Cammarata is a Senior Principal Software Engineer, Ansible, Red Hat.


rss-icon  RSS Feed