评论

收藏

[python] Nornir 网络自动化框架 代码解析

编程语言 编程语言 发布于:2021-07-05 19:22 | 阅读数:682 | 评论:0

  Nornir 一种类似ansible的python网络设备管理框架;
  官方定义:
Nornir is an automation framework written in python to be used with python
基于python开发的、同时也是通过ptyhon方式调用的 (网络)自动化框架
  官方参考文档:https://nornir.readthedocs.io/en/latest/index.html
  本文目的:介绍nornir底层代码,以便后续进行二次开发。

1.初识nornir

  先根据官网的tutorial 跑通一下nornir的整个流程

  • a.目录结构
nornir_ssh % ls -R
config.yaml   inventory     nornir.log    nornir_01.py
./inventory:
defaults.yaml   groups.yaml   hosts.yaml

  • b.文件内容
# ./inventory/hosts.yaml
---
h3c_sw:
  hostname: 192.168.31.103
  port: 22
  username: admin
  password: admin
  platform: hp_comware  # 这里根据使用的connectionPlugin的不同,选择对应厂商的名字,
            # 本次使用netmiko,所以netmiko连接h3c的设备可以用hp_comware的设备类型(device_type);
            # 如果是napalm的话 名字可能会不一样。
  group:
  - H3C_Device
  data:
  cmds:
    - display cur
# ./inventory/groups.yaml
---
H3C_Device:
  data:
  - vsr1000
Cisco_Device:
  data:
  - ios
# ./inventory/defaults.yaml
为空
# nornir.log
为空
# config.yaml
---
inventory:
  plugin: SimpleInventory
  options:
  host_file: "inventory/hosts.yaml"
  group_file: "inventory/groups.yaml"
  defaults_file: "inventory/defaults.yaml"
runner:
  plugin: threaded
  options:
  num_workers: 100
# nornir01.py 可以随便取,无所谓
from nornir import InitNornir
from nornir_utils.plugins.functions import print_result
from nornir_netmiko import netmiko_send_command

def show_cmds(task):
  outputs = []
  cmds = task.host.data['cmds']
  for cmd in cmds:
    result = task.run(netmiko_send_command, command_string=cmd)
    output = result.result
    outputs += output
  return outputs

nr = InitNornir(config_file='config.yaml', dry_run=True)
results = nr.run(task=show_cmds)
print_result(results['h3c_sw'][0])
  运行上面的python文件:
python nornir01.py
2.理解nornir组件概念和python包的知识
# 1.最底层的task是函数,后面函数作为Task对象的参数,封装一个Task对象task
# 2.processor 本质是一个接口,所谓接口就是要实现特定方法的类
# 3.理解pkg_resource 包的作用,其实就是将一些目录下的文件所定义的对象 加载到一个字典中来;下面的连接插件注册和运行插件注册都是用来这个原理
# 4.connection plugin 是什么? netmiko/napalm ?
# 5.runner plugin 是什么? threaded ?
# 6.插件加载完后,会根据输入的内容判断 是否 插件字典available有对应名字的插件对象
3.结合nornir底层代码片段来分析整个流程

  下面会重点讲诉如下几行代码:
'''
'''
def show_cmds(task):
  outputs = []
  cmds = task.host.data['cmds']
  for cmd in cmds:
    result = task.run(netmiko_send_command, command_string=cmd)
    output = result.result
    outputs += output
  return outputs
nr = InitNornir(config_file='config.yaml', dry_run=True)
results = nr.run(task=show_cmds)
'''
'''

  • 3.1.自定义函数show_comds中 必须要有1个参数,这个参数到时会被传递一个Task实例对象:
# nornir.core.task.py -> Task.start()       
r = self.task(self, **self.params)  # 这个self.task是自定义函数show_cmds

  • 3.2.InitNornir()根据配置文件或关键字参数进行 配置项初始化
def InitNornir(config_file: str = "", dry_run: bool = False, **kwargs: Any,) -> Nornir:
  """
  Arguments:
    config_file(str): Path to the configuration file (optional)
    dry_run(bool): Whether to simulate changes or not
    configure_logging: Whether to configure logging or not. This argument is being
      deprecated. Please use logging.enabled parameter in the configuration
      instead.
    **kwargs: Extra information to pass to the
      :obj:`nornir.core.configuration.Config` object
  Returns:
    :obj:`nornir.core.Nornir`: fully instantiated and configured
  """
  ConnectionPluginRegister.auto_register()  # 注册/加载 连接插件 比如netmiko、napalm等;通过pkg_resource包来注册
  # 有传递配置文件的话就根据配置文件初始化,否则根据关键字参数初始化
  if config_file:
    config = Config.from_file(config_file, **kwargs)
  else:
    config = Config.from_dict(**kwargs)
  data = GlobalState(dry_run=dry_run)
  config.logging.configure()
  # 返回一个Nornir实例对象
  return Nornir(
    inventory=load_inventory(config),  # 加载主机插件,比如SimpleInventory,将主机资产从配置文件 放到python对象中
    runner=load_runner(config),  # 加载runner插件,比如threaded、serial
    config=config,
    data=data,
  )

  • 3.3 下面看一下Nornir.runner.run这个方法
  重点是这行代码:result = self.runner.run(task, run_on),其实是进入runner插件的run()方法里面
# nornir.core.__init__.py -> Nornir.run()
  def run(
    self,
    task,
    raise_on_error=None,
    on_good=True,
    on_failed=False,
    name: Optional[str] = None,
    **kwargs,
  ):
    """
    Run task over all the hosts in the inventory.
    Arguments:
      task (``callable``): function or callable that will be run against each device in
        the inventory
      raise_on_error (``bool``): Override raise_on_error behavior
      on_good(``bool``): Whether to run or not this task on hosts marked as good
      on_failed(``bool``): Whether to run or not this task on hosts marked as failed
      **kwargs: additional argument to pass to ``task`` when calling it
    Raises:
      :obj:`nornir.core.exceptions.NornirExecutionError`: if at least a task fails
        and self.config.core.raise_on_error is set to ``True``
    Returns:
      :obj:`nornir.core.task.AggregatedResult`: results of each execution
    """
    task = Task(
      task,  # task 是一个函数
      self,  # nornir 实例
      global_dry_run=self.data.dry_run,
      name=name,
      processors=self.processors,  # 将包含processor的列表传递进去;processor是实现来一些方法的类,这些方法在任务执行过程的某个时间点会被执行
      **kwargs,  # 这个参数应该是放入到task函数里面的
    )
    self.processors.task_started(task)  # 运行任务启动前的processor 的task_task_started方法;processor是针对所有设备而言的,不是针对特定的设备
    run_on = []  # 存放需要运行的task函数的所有设备
    if on_good:
      for name, host in self.inventory.hosts.items():
        if name not in self.data.failed_hosts:
          run_on.append(host)
    if on_failed:
      for name, host in self.inventory.hosts.items():
        if name in self.data.failed_hosts:
          run_on.append(host)
    # 打印日志,日志输出一共有多少台设备
    num_hosts = len(self.inventory.hosts)
    if num_hosts:
      logger.info(
        "Running task %r with args %s on %d hosts",
        task.name,
        kwargs,
        num_hosts,
      )
    else:
      logger.warning("Task %r has not been run – 0 hosts selected", task.name)
    # 将封装好的Task对象task 和所有主机 放入runner.run()中执行
    result = self.runner.run(task, run_on)
    # runner.run()异常处理
    raise_on_error = (
      raise_on_error
      if raise_on_error is not None
      else self.config.core.raise_on_error
    )  # noqa
    if raise_on_error:
      result.raise_on_error()
    else:
      self.data.failed_hosts.update(result.failed_hosts.keys())
    # runner.run()运行完成是调用processor.task_completed()函数
    self.processors.task_completed(task, result)
    return result

  • 3.3.1 进入runner插件的run方法看一下
  如果直接点击代码引用的话,是直接跳到runner接口定义的代码中,我们要看具体的runner插件实现.
  看如下代码,其实是将Task.start方法放入线程池里面(因为我们的runner插件在config.yaml中配置的是threaded)
# nornir/plugins/runners/__init__.py  -> ThreadedRunner.run()
  def run(self, task: Task, hosts: List[Host]) -> AggregatedResult:
    result = AggregatedResult(task.name)
    futures = []
    with ThreadPoolExecutor(self.num_workers) as pool:
      for host in hosts:
        future = pool.submit(task.copy().start, host)
        futures.append(future)

  • 3.3.2 再进上面代码中的重点代码看看
  future = pool.submit(task.copy().start, host)
# copy比较简单 直接把Task实例对象的属性 用来再实例化Task
  def copy(self) -> "Task":
    return Task(
      self.task,
      self.nornir,
      self.global_dry_run,
      self.processors,
      self.name,
      self.severity_level,
      self.parent_task,
      **self.params
    )
  def __repr__(self) -> str:
    return self.name
  # 这个start方法是重点
  def start(self, host: "Host") -> "MultiResult":
    """
    Run the task for the given host.
    Arguments:
      host (:obj:`nornir.core.inventory.Host`): Host we are operating with. Populated right
        before calling the ``task``
      nornir(:obj:`nornir.core.Nornir`): Populated right before calling
        the ``task``
    Returns:
      host (:obj:`nornir.core.task.MultiResult`): Results of the task and its subtasks
    """
    self.host = host
    if self.parent_task is not None:
      self.processors.subtask_instance_started(self, host)
    else:
      self.processors.task_instance_started(self, host)
    try:
      logger.debug("Host %r: running task %r", self.host.name, self.name)
      # 重点,self.task是自定义函数来,不是Task对象,看第一个参数是self,跟代码分析时说的一样,我们自定义函数时必须要给函数设定一个参数
      r = self.task(self, **self.params)
      if not isinstance(r, Result):
        r = Result(host=host, result=r)
    except NornirSubTaskError as e:
      tb = traceback.format_exc()
      logger.error(
        "Host %r: task %r failed with traceback:\n%s",
        self.host.name,
        self.name,
        tb,
      )
      r = Result(host, exception=e, result=str(e), failed=True)
    except Exception as e:
      tb = traceback.format_exc()
      logger.error(
        "Host %r: task %r failed with traceback:\n%s",
        self.host.name,
        self.name,
        tb,
      )
      r = Result(host, exception=e, result=tb, failed=True)
    r.name = self.name
    if r.severity_level == DEFAULT_SEVERITY_LEVEL:
      if r.failed:
        r.severity_level = logging.ERROR
      else:
        r.severity_level = self.severity_level
    self.results.insert(0, r)
    if self.parent_task is not None:
      self.processors.subtask_instance_completed(self, host, self.results)
    else:
      self.processors.task_instance_completed(self, host, self.results)
    return self.results

  • 3.4 下面在看一下Task.run()这个方法
  注意跟Nornir.runner.run()区分,不要搞混淆
# nornir/core/task.py -> Task.run()
  # 这个Task里面的run 是自定义函数调用 其他task函数时用的(有点嵌套的感觉)
  def run(self, task: Callable[..., Any], **kwargs: Any) -> "MultiResult":
    """
    This is a utility method to call a task from within a task. For instance:
      def grouped_tasks(task):
        task.run(my_first_task)
        task.run(my_second_task)
      nornir.run(grouped_tasks)
    This method will ensure the subtask is run only for the host in the current thread.
    """
    if not self.host:
      msg = (
        "You have to call this after setting host and nornir attributes. ",
        "You probably called this from outside a nested task",
      )
      raise Exception(msg)
    if "severity_level" not in kwargs:
      kwargs["severity_level"] = self.severity_level
    # 重点1: 将内嵌的task方法封装成Task对象
    run_task = Task(
      task,
      self.nornir,
      global_dry_run=self.global_dry_run,
      processors=self.processors,
      parent_task=self,
      **kwargs
    )
    # 重点2: 调用start方法,上面“3.3.2”中说过的Task.start
    r = run_task.start(self.host)
    self.results.append(r[0] if len(r) == 1 else cast("Result", r))
    if r.failed:
      # Without this we will keep running the grouped task
      raise NornirSubTaskError(task=run_task, result=r)
    return r
4.总结

  通过上述代码的演示,应该可以看出Nornir底层代码的大概实现了。
  如果要看懂,要自己亲自动手一起调试结合断点 才能看明白,不然会看的很晕。
  ps:
其实还有个processor的东西还可以讲,其实就是在Task运行到某个状态执行processor类中的某个方法,其实在上面代码分析过程中也是可以大概看出一些眉目来的。因为篇幅的原因就说到这,如果有什么不对的地方,望批评指正,谢谢!
关注下面的标签,发现更多相似文章