# nornir.core.task.py -> Task.start()
r = self.task(self, **self.params) # 这个self.task是自定义函数show_cmds
3.2.InitNornir()根据配置文件或关键字参数进行 配置项初始化
def InitNornir(config_file: str = "", dry_run: bool = False, **kwargs: Any,) -> Nornir:
"""
Arguments:
config_file(str): Path to the configuration file (optional)
dry_run(bool): Whether to simulate changes or not
configure_logging: Whether to configure logging or not. This argument is being
deprecated. Please use logging.enabled parameter in the configuration
instead.
**kwargs: Extra information to pass to the
:obj:`nornir.core.configuration.Config` object
Returns:
:obj:`nornir.core.Nornir`: fully instantiated and configured
"""
ConnectionPluginRegister.auto_register() # 注册/加载 连接插件 比如netmiko、napalm等;通过pkg_resource包来注册
# 有传递配置文件的话就根据配置文件初始化,否则根据关键字参数初始化
if config_file:
config = Config.from_file(config_file, **kwargs)
else:
config = Config.from_dict(**kwargs)
data = GlobalState(dry_run=dry_run)
config.logging.configure()
# 返回一个Nornir实例对象
return Nornir(
inventory=load_inventory(config), # 加载主机插件,比如SimpleInventory,将主机资产从配置文件 放到python对象中
runner=load_runner(config), # 加载runner插件,比如threaded、serial
config=config,
data=data,
)
# nornir.core.__init__.py -> Nornir.run()
def run(
self,
task,
raise_on_error=None,
on_good=True,
on_failed=False,
name: Optional[str] = None,
**kwargs,
):
"""
Run task over all the hosts in the inventory.
Arguments:
task (``callable``): function or callable that will be run against each device in
the inventory
raise_on_error (``bool``): Override raise_on_error behavior
on_good(``bool``): Whether to run or not this task on hosts marked as good
on_failed(``bool``): Whether to run or not this task on hosts marked as failed
**kwargs: additional argument to pass to ``task`` when calling it
Raises:
:obj:`nornir.core.exceptions.NornirExecutionError`: if at least a task fails
and self.config.core.raise_on_error is set to ``True``
Returns:
:obj:`nornir.core.task.AggregatedResult`: results of each execution
"""
task = Task(
task, # task 是一个函数
self, # nornir 实例
global_dry_run=self.data.dry_run,
name=name,
processors=self.processors, # 将包含processor的列表传递进去;processor是实现来一些方法的类,这些方法在任务执行过程的某个时间点会被执行
**kwargs, # 这个参数应该是放入到task函数里面的
)
self.processors.task_started(task) # 运行任务启动前的processor 的task_task_started方法;processor是针对所有设备而言的,不是针对特定的设备
run_on = [] # 存放需要运行的task函数的所有设备
if on_good:
for name, host in self.inventory.hosts.items():
if name not in self.data.failed_hosts:
run_on.append(host)
if on_failed:
for name, host in self.inventory.hosts.items():
if name in self.data.failed_hosts:
run_on.append(host)
# 打印日志,日志输出一共有多少台设备
num_hosts = len(self.inventory.hosts)
if num_hosts:
logger.info(
"Running task %r with args %s on %d hosts",
task.name,
kwargs,
num_hosts,
)
else:
logger.warning("Task %r has not been run – 0 hosts selected", task.name)
# 将封装好的Task对象task 和所有主机 放入runner.run()中执行
result = self.runner.run(task, run_on)
# runner.run()异常处理
raise_on_error = (
raise_on_error
if raise_on_error is not None
else self.config.core.raise_on_error
) # noqa
if raise_on_error:
result.raise_on_error()
else:
self.data.failed_hosts.update(result.failed_hosts.keys())
# runner.run()运行完成是调用processor.task_completed()函数
self.processors.task_completed(task, result)
return result
# nornir/plugins/runners/__init__.py -> ThreadedRunner.run()
def run(self, task: Task, hosts: List[Host]) -> AggregatedResult:
result = AggregatedResult(task.name)
futures = []
with ThreadPoolExecutor(self.num_workers) as pool:
for host in hosts:
future = pool.submit(task.copy().start, host)
futures.append(future)
3.3.2 再进上面代码中的重点代码看看
future = pool.submit(task.copy().start, host)
# copy比较简单 直接把Task实例对象的属性 用来再实例化Task
def copy(self) -> "Task":
return Task(
self.task,
self.nornir,
self.global_dry_run,
self.processors,
self.name,
self.severity_level,
self.parent_task,
**self.params
)
def __repr__(self) -> str:
return self.name
# 这个start方法是重点
def start(self, host: "Host") -> "MultiResult":
"""
Run the task for the given host.
Arguments:
host (:obj:`nornir.core.inventory.Host`): Host we are operating with. Populated right
before calling the ``task``
nornir(:obj:`nornir.core.Nornir`): Populated right before calling
the ``task``
Returns:
host (:obj:`nornir.core.task.MultiResult`): Results of the task and its subtasks
"""
self.host = host
if self.parent_task is not None:
self.processors.subtask_instance_started(self, host)
else:
self.processors.task_instance_started(self, host)
try:
logger.debug("Host %r: running task %r", self.host.name, self.name)
# 重点,self.task是自定义函数来,不是Task对象,看第一个参数是self,跟代码分析时说的一样,我们自定义函数时必须要给函数设定一个参数
r = self.task(self, **self.params)
if not isinstance(r, Result):
r = Result(host=host, result=r)
except NornirSubTaskError as e:
tb = traceback.format_exc()
logger.error(
"Host %r: task %r failed with traceback:\n%s",
self.host.name,
self.name,
tb,
)
r = Result(host, exception=e, result=str(e), failed=True)
except Exception as e:
tb = traceback.format_exc()
logger.error(
"Host %r: task %r failed with traceback:\n%s",
self.host.name,
self.name,
tb,
)
r = Result(host, exception=e, result=tb, failed=True)
r.name = self.name
if r.severity_level == DEFAULT_SEVERITY_LEVEL:
if r.failed:
r.severity_level = logging.ERROR
else:
r.severity_level = self.severity_level
self.results.insert(0, r)
if self.parent_task is not None:
self.processors.subtask_instance_completed(self, host, self.results)
else:
self.processors.task_instance_completed(self, host, self.results)
return self.results
3.4 下面在看一下Task.run()这个方法
注意跟Nornir.runner.run()区分,不要搞混淆
# nornir/core/task.py -> Task.run()
# 这个Task里面的run 是自定义函数调用 其他task函数时用的(有点嵌套的感觉)
def run(self, task: Callable[..., Any], **kwargs: Any) -> "MultiResult":
"""
This is a utility method to call a task from within a task. For instance:
def grouped_tasks(task):
task.run(my_first_task)
task.run(my_second_task)
nornir.run(grouped_tasks)
This method will ensure the subtask is run only for the host in the current thread.
"""
if not self.host:
msg = (
"You have to call this after setting host and nornir attributes. ",
"You probably called this from outside a nested task",
)
raise Exception(msg)
if "severity_level" not in kwargs:
kwargs["severity_level"] = self.severity_level
# 重点1: 将内嵌的task方法封装成Task对象
run_task = Task(
task,
self.nornir,
global_dry_run=self.global_dry_run,
processors=self.processors,
parent_task=self,
**kwargs
)
# 重点2: 调用start方法,上面“3.3.2”中说过的Task.start
r = run_task.start(self.host)
self.results.append(r[0] if len(r) == 1 else cast("Result", r))
if r.failed:
# Without this we will keep running the grouped task
raise NornirSubTaskError(task=run_task, result=r)
return r