目录

快速指南

Concurrency in Python - Introduction

在本章中,我们将了解Python中的并发概念,并了解不同的线程和进程。

什么是并发?

简单来说,并发性是同时发生的两个或多个事件。 并发是一种自然现象,因为许多事件在任何给定时间同时发生。

在编程方面,并发性是指两个任务在执行时重叠。 通过并发编程,我们的应用程序和软件系统的性能可以得到改善,因为我们可以同时处理请求而不是等待前一个请求完成。

并发的历史回顾

以下几点将为我们提供并发的简要历史回顾 -

从铁路的概念

并发性与铁路概念密切相关。 对于铁路,需要在同一铁路系统上处理多列火车,以便每辆火车都能安全到达目的地。

学术界的并行计算

计算机科学并发的兴趣始于Edsger W. Dijkstra于1965年发表的研究论文。在本文中,他发现并解决了互斥问题,即并发控制的特性。

高级并发原语

最近,由于引入了高级并发原语,程序员正在改进并发解决方案。

使用编程语言改进了并发性

Google的Golang,Rust和Python等编程语言在帮助我们获得更好的并发解决方案的领域取得了令人难以置信的发展。

什么是线程和多线程?

Thread是可以在操作系统中执行的最小执行单元。 它本身不是一个程序,而是在一个程序中运行。 换句话说,线程不是彼此独立的。 每个线程与其他线程共享代码段,数据段等。 它们也被称为轻量级过程。

线程由以下组件组成 -

  • 程序计数器,包含下一个可执行指令的地址

  • Stack

  • 寄存器集

  • 一个独特的身份

另一方面, Multithreading是CPU通过并发执行多个线程来管理操作系统使用的能力。 多线程的主要思想是通过将进程划分为多个线程来实现并行性。 在以下示例的帮助下,可以理解多线程的概念。

例子 (Example)

假设我们正在运行一个特定的过程,其中我们打开MS Word以在其中键入内容。 将分配一个线程来打开MS Word,并且需要另一个线程在其中键入内容。 现在,如果我们想要编辑现有的,那么另一个线程将需要执行编辑任务,依此类推。

什么是流程和多处理?

process定义为实体,表示要在系统中实施的基本工作单元。 简单来说,我们将计算机程序写入文本文件中,当我们执行该程序时,它将成为执行程序中提到的所有任务的过程。 在过程生命周期中,它经过不同的阶段 - 开始,准备,运行,等待和终止。

下图显示了流程的不同阶段 -

多

一个进程只能有一个线程,称为主线程,或者多个线程有自己的寄存器,程序计数器和堆栈。 下图将向我们展示差异 -

多处理一

另一方面, Multiprocessing,是在单个计算机系统中使用两个或更多个CPU单元。 我们的主要目标是充分发挥硬件的潜力。 为此,我们需要利用计算机系统中可用的全部CPU内核。 多处理是这样做的最佳方法。

多处理二

Python是最流行的编程语言之一。 以下是使其适用于并发应用程序的一些原因 -

句法糖

语法糖是编程语言中的语法,旨在使事物更易于阅读或表达。 它使语言“更甜”供人类使用:事物可以更清晰,更简洁地表达,或者根据偏好以另一种方式表达。 Python附带了Magic方法,可以定义它们作用于对象。 这些Magic方法被用作语法糖并绑定到更易于理解的关键字。

大社区

Python语言已经见证了数据科学家和数学家的大量采用率,他们在AI,机器学习,深度学习和定量分析领域工作。

用于并发编程的有用API

Python 2和3拥有大量专用于并行/并发编程的API。 其中最受欢迎的是threading, concurrent.features, multiprocessing, asyncio, gevent and greenlets,等。

Python在实现并发应用程序中的局限性

Python对并发应用程序有限制。 这种限制在Python中存在,称为GIL (Global Interpreter Lock) 。 GIL从不允许我们使用多个CPU内核,因此我们可以说Python中没有真正的线程。 我们可以理解GIL的概念如下 -

GIL (Global Interpreter Lock)

它是Python世界中最具争议的话题之一。 在CPython中,GIL是互斥锁 - 互斥锁,它使线程安全。 换句话说,我们可以说GIL阻止多个线程并行执行Python代码。 锁一次只能由一个线程保存,如果我们想要执行一个线程,那么它必须首先获取锁。 下面的图表将帮助您了解GIL的工作情况。

限制

但是,Python中有一些库和实现,如Numpy, JpythonIronPytbhon. 这些库在没有与GIL交互的情况下工作。

Concurrency vs Parallelism

并发和并行都用于多线程程序,但是它们之间的相似性和差异存在很多混淆。 这方面的一个重要问题是:并发并行性与否? 尽管这两个术语看起来非常相似,但上述问题的答案是否定的,但并发性和并行性并不相同。 现在,如果它们不相同,那么它们之间的基本区别是什么?

简单来说,并发性涉及管理来自不同线程的共享状态访问,另一方面,并​​行性涉及利用多个CPU或其核心来提高硬件性能。

并发细节

并发是指两个任务在执行时重叠。 可能是应用程序同时在多个任务上进行的情况。 我们可以用图解理解它; 多项任务正在同时取得进展,如下 -

并发

并发级别

在本节中,我们将从编程方面讨论三个重要的并发级别 -

Low-Level Concurrency

在这种并发级别中,明确使用原子操作。 我们不能在应用程序构建中使用这种并发性,因为它非常容易出错且难以调试。 甚至Python也不支持这种并发。

Mid-Level Concurrency

在这种并发中,没有使用显式原子操作。 它使用显式锁。 Python和其他编程语言支持这种并发。 大多数应用程序员使用这种并发性。

High-Level Concurrency

在这种并发中,既不使用显式原子操作也不使用显式锁。 Python有concurrent.futures模块来支持这种并发。

并发系统的属性

要使程序或并发系统正确,必须满足某些属性。 与终止系统有关的属性如下 -

正确的财产

正确性属性意味着程序或系统必须提供所需的正确答案。 为了简单起见,我们可以说系统必须正确地将起始程序状态映射到最终状态。

安全财产

安全属性意味着程序或系统必须保持“good”“safe”状态,并且不会做任何“bad”

生活财产

此属性意味着程序或系统必须“make progress”并且它将达到某种理想状态。

并发系统的参与者

这是并发系统的一个常见属性,其中可以有多个进程和线程,它们同时运行以在自己的任务上取得进展。 这些进程和线程称为并发系统的actor。

并发系统的资源

演员必须利用内存,磁盘,打印机等资源才能执行任务。

某些规则

每个并发系统必须拥有一套规则来定义演员要执行的任务类型以及每个系统的时间安排。 任务可能是获取锁,内存共享,修改状态等。

并发系统的障碍

在实现并发系统时,程序员必须考虑以下两个重要问题,这些问题可能是并发系统的障碍 -

共享数据

实现并发系统时的一个重要问题是在多个线程或进程之间共享数据。 实际上,程序员必须确保锁保护共享数据,以便对其进行所有访问,并且一次只有一个线程或进程可以访问共享数据。 如果多个线程或进程都试图访问相同的共享数据,那么除了其中至少一个之外的所有线程或进程都不会被阻止并且将保持空闲状态。 换句话说,我们可以说当锁定生效时我们只能使用一个进程或线程。 可以有一些简单的解决方案来消除上述障碍 -

数据共享限制

最简单的解决方案是不共享任何可变数据。 在这种情况下,我们不需要使用显式锁定,并且可以解决由于相互数据导致的并发障碍。

数据结构协助

很多时候并发进程需要同时访问相同的数据。 除了使用显式锁之外,另一种解决方案是使用支持并发访问的数据结构。 例如,我们可以使用queue模块,它提供线程安全的队列。 我们还可以使用multiprocessing.JoinableQueue类来实现基于多处理的并发。

不可变数据传输

有时,我们使用的数据结构,比如并发队列,不适合我们可以传递不可变数据而不锁定它。

可变数据传输

继续上述解决方案,假设是否只需要传递可变数据而不是不可变数据,那么我们就可以传递只读的可变数据。

共享I/O资源

实现并发系统的另一个重要问题是线程或进程使用I/O资源。 当一个线程或进程使用I/O这么长时间而其他线程或进程空闲时,就会出现问题。 在处理I/O繁重的应用程序时,我们可以看到这种障碍。 在示例的帮助下可以理解从Web浏览器请求页面。 这是一个繁重的应用程序。 在这里,如果请求数据的速率低于它的消耗速率,那么我们的并发系统中就有I/O障碍。

以下Python脚本用于请求网页并获取我们的网络获取所请求页面所花费的时间 -

import urllib.request
import time
ts = time.time()
req = urllib.request.urlopen('http://www.iowiki.com')
pageHtml = req.read()
te = time.time()
print("Page Fetching Time : {} Seconds".format (te-ts))

执行上述脚本后,我们可以获取页面获取时间,如下所示。

输出 (Output)

Page Fetching Time: 1.0991398811340332 Seconds

我们可以看到获取页面的时间超过一秒。 现在,如果我们想要获取数千个不同的网页,您可以了解我们的网络将花费多少时间。

什么是Parallelism?

并行性可以定义为将任务分成可以同时处理的子任务的技术。 如上所述,它与并发性相反,其中两个或多个事件同时发生。 我们可以用图解理解它; 任务分为若干可以并行处理的子任务,如下所示 -

排比

要更多地了解并发性和并行性之间的区别,请考虑以下几点 -

并发但不平行

应用程序可以是并发但不是并行意味着它同时处理多个任务,但任务不会分解为子任务。

并行但不并发

应用程序可以是并行但不是并发意味着它一次只能在一个任务上运行,并且可以并行处理分解为子任务的任务。

无论是并行还是并发

应用程序既不能并行也不能并发。 这意味着它一次只能处理一个任务,并且任务永远不会分解为子任务。

并行和并发

应用程序可以是并行和并发意味着它既可以一次处理多个任务,也可以将任务分解为子任务以并行执行它们。

并行性的必要性

我们可以通过在单个CPU的不同核心之间或在网络内连接的多个计算机之间分配子任务来实现并行性。

考虑以下要点,以了解为什么有必要实现并行性 -

高效的代码执行

借助并行性,我们可以有效地运行代码。 它将节省我们的时间,因为部分中的相同代码并行运行。

比顺序计算更快

顺序计算受物理和实际因素的限制,因此不可能获得更快的计算结果。 另一方面,这个问题通过并行计算解决,并且比顺序计算给我们更快的计算结果。

减少执行时间

并行处理减少了程序代码的执行时间。

如果我们谈论并行的真实例子,我们计算机的显卡就是突出并行处理真正力量的例子,因为它有数百个独立处理的内核,可以独立工作并且可以同时执行。 由于这个原因,我们也能够运行高端应用程序和游戏。

了解处理器的实现

我们知道并发性,并行性和它们之间的区别,但是它实现的系统又如何呢? 理解我们将要实施的系统是非常必要的,因为它为我们提供了在设计软件时做出明智决策的好处。 我们有以下两种处理器 -

Single-core processors

单核处理器能够在任何给定时间执行一个线程。 这些处理器使用context switching在特定时间存储线程的所有必要信息,然后再恢复信息。 上下文切换机制可以帮助我们在给定秒内的多个线程上取得进展,看起来好像系统正在处理多个事情。

单核处理器具有许多优点。 这些处理器需要更少的功率,并且多个核之间没有复杂的通信协议。 另一方面,单核处理器的速度有限,不适合大型应用。

Multi-core processors

多核处理器具有多个独立的处理单元,也称为cores

这样的处理器不需要上下文切换机制,因为每个核包含执行一系列存储指令所需的一切。

Fetch-Decode-Execute Cycle

多核处理器的核心遵循一个执行周期。 该循环称为Fetch-Decode-Execute循环。 它涉及以下步骤 -

Fetch

这是循环的第一步,它涉及从程序存储器中取出指令。

Decode

最近获取的指令将被转换为一系列信号,这些信号将触发CPU的其他部分。

执行 (Execute)

这是执行读取和解码指令的最后一步。 执行结果将存储在CPU寄存器中。

这里的一个优点是多核处理器中的执行速度比单核处理器的执行速度快。 它适用于大型应用。 另一方面,多核之间的复杂通信协议是一个问题。 多核需要比单核处理器更多的功率。

System and Memory Architecture

在设计程序或并发系统时,需要考虑不同的系统和内存架构样式。 这是非常必要的,因为一个系统和内存样式可能适合于一个任务,但可能容易出错其他任务。

支持并发的计算机系统体系结构

Michael Flynn于1972年对分类不同风格的计算机系统架构进行了分类。 该分类法定义了以下四种不同的样式 -

  • 单指令流,单数据流(SISD)
  • 单指令流,多数据流(SIMD)
  • 多指令流,单数据流(MISD)
  • 多指令流,多数据流(MIMD)。

单指令流,单数据流(SISD)

顾名思义,这种类型的系统将具有一个顺序输入数据流和一个单个处理单元来执行数据流。 它们就像具有并行计算架构的单处理器系统。 以下是SISD的架构 -

SSID

SISD的优点

SISD架构的优点如下 -

  • 它需要更少的电力。
  • 多核之间没有复杂通信协议的问题。

SISD的缺点

SISD架构的缺点如下 -

  • SISD架构的速度与单核处理器一样有限。
  • 它不适合大型应用。

单指令流,多数据流(SIMD)

顾名思义,这种类型的系统将具有多个输入数据流和多个处理单元,这些处理单元可以在任何给定时间作用于单个指令。 它们就像具有并行计算架构的多处理器系统。 以下是SIMD的架构 -

SIMD

SIMD的最佳示例是显卡。 这些卡有数百个独立的处理单元。 如果我们谈论SISD和SIMD之间的计算差异,那么对于添加数组[5, 15, 20] 5,15,20 [5, 15, 20][15, 25, 10], 15,25,10 [15, 25, 10], SISD架构必须执行三种不同的添加操作。 另一方面,使用SIMD架构,我们可以在单个添加操作中添加。

SIMD的优点

SIMD架构的优点如下 -

  • 可以仅使用一个指令来执行对多个元素的相同操作。

  • 通过增加处理器的核心数量可以增加系统的吞吐量。

  • 处理速度高于SISD架构。

SIMD的缺点

SIMD架构的缺点如下 -

  • 处理器的核心数量之间存在复杂的通信。
  • 成本高于SISD架构。

多指令单数据(MISD)流

具有MISD流的系统具有通过在同一数据集上执行不同指令来执行不同操作的多个处理单元。 以下是MISD的架构 -

MISD

MISD架构的代表尚未商业化。

多指令多数据(MIMD)流

在使用MIMD架构的系统中,多处理器系统中的每个处理器可以在不同的数据集并行上独立地执行不同的指令集。 它与SIMD架构相反,在SIMD架构中,对多个数据集执行单个操作。 以下是MIMD的架构 -

MIMD

普通的多处理器使用MIMD架构。 这些架构主要用于许多应用领域,如计算机辅助设计/计算机辅助制造,仿真,建模,通信开关等。

支持并发的内存架构

在使用并发和并行等概念的同时,总是需要加速程序。 计算机设计者发现的一个解决方案是创建共享存储器多计算机,即具有单个物理地址空间的计算机,其由处理器所具有的所有核访问。 在这种情况下,可以有许多不同风格的架构,但以下是三种重要的架构风格 -

UMA (Uniform Memory Access)

在此模型中,所有处理器均匀地共享物理内存。 所有处理器对所有存储器字具有相同的访问时间。 每个处理器可以具有专用高速缓冲存储器 外围设备遵循一系列规则。

当所有处理器对所有外围设备具有相同的访问权限时,该系统称为symmetric multiprocessor 。 当只有一个或几个处理器可以访问外围设备时,该系统称为asymmetric multiprocessor

UMA

Non-uniform Memory Access (NUMA)

在NUMA多处理器模型中,访问时间随存储器字的位置而变化。 这里,共享存储器物理地分布在所有处理器中,称为本地存储器。 所有本地存储器的集合形成全局地址空间,可由所有处理器访问。

NUMA

仅缓存内存架构(COMA)

COMA模型是NUMA模型的专用版本。 这里,所有分布式主存储器都被转换为高速缓冲存储器。

昏迷

Concurrency in Python - Threads

一般来说,正如我们所知,线是一种非常薄的扭曲线,通常是棉或丝织物,用于缝制衣服等。 同一术语线程也用于计算机编程领域。 现在,我们如何将用于缝制衣服的线程与用于计算机编程的线程联系起来? 这两个线程执行的角色类似。 在衣服中,线将布料保持在一起,在另一侧,在计算机编程中,线程保持计算机程序并允许程序一次执行顺序动作或许多动作。

Thread是操作系统中最小的执行单元。 它本身不是一个程序,而是在一个程序中运行。 换句话说,线程不是彼此独立的,而是与其他线程共享代码段,数据段等。 这些线程也称为轻量级进程。

线程状态

为了深入理解线程的功能,我们需要了解线程的生命周期或不同的线程状态。 通常,线程可以以五种不同的状态存在。 不同的州显示如下 -

新线程

新线程在新状态下开始其生命周期。 但是,在这个阶段,它还没有开始,也没有分配任何资源。 我们可以说它只是一个对象的实例。

Runnable

当新生的线程启动时,线程变为可运行,即等待运行。 在此状态下,它具有所有资源但仍未安排任务计划程序运行。

Running

在此状态下,线程进行并执行任务,任务调度程序选择该任务来运行。 现在,线程可以进入死状态或非可运行/等待状态。

Non-running/waiting

在此状态下,线程暂停,因为它正在等待某些I/O请求的响应或等待其他线程的执行完成。

Dead

可运行线程在完成任务或以其他方式终止时进入终止状态。

下图显示了线程的完整生命周期 -

死

线程类型

在本节中,我们将看到不同类型的线程。 类型如下所述 -

用户级线程

这些是用户管理的线程。

在这种情况下,线程管理内核不知道线程的存在。 线程库包含用于创建和销毁线程的代码,用于在线程之间传递消息和数据,用于调度线程执行以及用于保存和恢复线程上下文的代码。 应用程序以单个线程开始。

用户级线程的示例是 -

  • Java线程
  • POSIX线程
死

用户级线程的优点

以下是用户级线程的不同优点 -

  • 线程切换不需要内核模式权限。
  • 用户级线程可以在任何操作系统上运行。
  • 调度可以是用户级线程中的特定于应用程序。
  • 用户级线程可以快速创建和管理。

用户级线程的缺点

以下是用户级线程的不同缺点 -

  • 在典型的操作系统中,大多数系统调用都是阻塞的。
  • 多线程应用程序无法利用多处理。

内核级线程

操作系统托管线程作用于内核,内核是操作系统核心。

在这种情况下,内核进行线程管理。 应用程序区域中没有线程管理代码。 内核线程由操作系统直接支持。 任何应用程序都可以编程为多线程。 应用程序中的所有线程都在单个进程中受支持。

内核维护整个流程和流程中各个线程的上下文信息。 内核调度是基于线程完成的。 内核在内核空间中执行线程创建,调度和管理。 内核线程的创建和管理速度通常比用户线程慢。 内核级线程的示例是Windows,Solaris。

死

内核级线程的优点

以下是内核级线程的不同优点 -

  • 内核可以在多个进程上同时调度来自同一进程的多个线程。

  • 如果进程中的一个线程被阻塞,则内核可以调度同一进程的另一个线程。

  • 内核例程本身可以是多线程的。

内核级线程的缺点

  • 内核线程的创建和管理速度通常比用户线程慢。

  • 在同一进程中将控制从一个线程转移到另一个线程需要模式切换到内核。

线程控制块 - TCB

线程控制块(TCB)可以定义为操作系统内核中主要包含线程信息的数据结构。 存储在TCB中的特定于线程的信息将突出显示有关每个进程的一些重要信息。

考虑以下与TCB中包含的线程相关的要点 -

  • Thread identification - 它是分配给每个新线程的唯一线程标识(tid)。

  • Thread state - 它包含与Thread state的状态(Running,Runnable,Non-Running,Dead)相关的信息。

  • Program Counter (PC) - 它指向线程的当前程序指令。

  • Register set - 它包含分配给它们的线程寄存器值以进行计算。

  • Stack Pointer - 它指向进程中线程的堆栈。 它包含线程范围内的局部变量。

  • Pointer to PCB的指针 - 它包含指向创建该线程的进程的指针。

PCB

进程和线程之间的关系

在多线程中,进程和线程是两个非常密切相关的术语,它们具有相同的目标,使计算机能够一次完成多个任务。 进程可以包含一个或多个线程,但相反,线程不能包含进程。 但是,它们仍然是两个基本的执行单位。 一个程序,执行一系列指令,启动进程和线程。

下表显示了进程和线程之间的比较 -

Sr.No. 处理 线
1 流程重量大或资源密集。 线程是轻量级的,比进程占用更少的资源。
2 进程切换需要与操作系统交互。 线程切换不需要与操作系统交互。
3 在多个处理环境中,每个进程执行相同的代码,但具有自己的内存和文件资源。 所有线程都可以共享同一组打开的文件,子进程。
4 如果一个进程被阻止,则在第一个进程被解除阻塞之前,不能执行任何其他进程。 当一个线程被阻塞并等待时,同一任务中的第二个线程可以运行。
5 不使用线程的多个进程使用更多资源。 多线程进程使用较少的资源。
6 在多个流程中,每个流程独立于其他流程运行。 一个线程可以读取,写入或更改另一个线程的数据。
7 如果父进程有任何更改,则它不会影响子进程。 如果主线程中有任何更改,那么它可能会影响该进程的其他线程的行为。
8 要与兄弟进程通信,进程必须使用进程间通信。 线程可以直接与该进程的其他线程通信。

多线程的概念

正如我们之前所讨论的那样,多线程是CPU通过并发执行多个线程来管理操作系统使用的能力。 多线程的主要思想是通过将进程划分为多个线程来实现并行性。 以更简单的方式,我们可以说多线程是通过使用线程概念实现多任务处理的方式。

在以下示例的帮助下,可以理解多线程的概念。

例子 (Example)

假设我们正在运行一个进程。 这个过程可能是为了写一些东西而打开MS字。 在这样的过程中,将分配一个线程来打开MS字,并且需要另一个线程来写。 现在,假设我们想要编辑某些内容,则需要另一个线程来执行编辑任务,依此类推。

下图帮助我们了解内存中存在多个线程 -

多线程

我们可以在上面的图中看到,在一个进程中可以存在多个线程,其中每个线程都包含自己的寄存器集和局部变量。 除此之外,进程中的所有线程共享全局变量。

多线程的优点

现在让我们看一下多线程的一些优点。 优点如下 -

  • Speed of communication - 多线程提高了计算速度,因为每个核心或处理器同时处理单独的线程。

  • Program remains responsive - 它允许程序保持响应,因为一个线程等待输入而另一个线程同时运行GUI。

  • Access to global variables - 在多线程中,特定进程的所有线程都可以访问全局变量,如果全局变量有任何变化,那么其他线程也可以看到它。

  • Utilization of resources - 在每个程序中运行多个线程可以更好地利用CPU,并且CPU的空闲时间变得更少。

  • Sharing of data - 每个线程不需要额外的空间,因为程序中的线程可以共享相同的数据。

多线程的缺点

现在让我们看一下多线程的一些缺点。 缺点如下 -

  • Not suitable for single processor system - 与多处理器系统的性能相比,多线程发现难以在单处理器系统上实现计算速度方面的性能。

  • Issue of security - 我们知道程序中的所有线程共享相同的数据,因此始终存在安全问题,因为任何未知线程都可以更改数据。

  • Increase in complexity - 多线程可能会增加程序的复杂性,并且调试变得困难。

  • Lead to deadlock state - 多线程可能导致程序达到死锁状态的潜在风险。

  • Synchronization required - Synchronization required同步以避免互斥。 这会导致更多内存和CPU利用率。

Implementation of Threads

在本章中,我们将学习如何在Python中实现线程。

用于线程实现的Python模块

Python线程有时被称为轻量级进程,因为线程占用的内存比进程少得多。 线程允许一次执行多个任务。 在Python中,我们有以下两个在程序中实现线程的模块 -

  • 《_thread》 module

  • 《threading》 module

这两个模块之间的主要区别在于《_thread》模块将线程视为一个函数,而《threading》模块将每个线程视为一个对象,并以面向对象的方式实现它。 此外, 《_thread》模块在低级线程中是有效的,并且具有比《threading》模块更少的能力。

《_thread》 module

在Python的早期版本中,我们有《thread》模块,但在很长一段时间内它被认为是“已弃用”。 鼓励用户使用《threading》模块。 因此,在Python 3中,模块“thread”不再可用。 它已被重命名为“ 《_thread》 ”以解决《_thread》中的向后不兼容问题。

要在《_thread》模块的帮助下生成新线程,我们需要调用它的start_new_thread方法。 借助以下语法可以理解此方法的工作原理 -

_thread.start_new_thread ( function, args[, kwargs] )

在这里 -

  • args是一个参数元组

  • kwargs是关键字参数的可选字典

如果我们想在不传递参数的情况下调用函数,那么我们需要在args使用一个空元组args

此方法调用立即返回,子线程启动,并使用传递的args列表(如果有)调用函数。 线程在函数返回时终止。

例子 (Example)

以下是使用《_thread》模块生成新线程的示例。 我们在这里使用start_new_thread()方法。

import _thread
import time
def print_time( threadName, delay):
   count = 0
   while count < 5:
      time.sleep(delay)
      count += 1
      print ("%s: %s" % ( threadName, time.ctime(time.time()) ))
try:
   _thread.start_new_thread( print_time, ("Thread-1", 2, ) )
   _thread.start_new_thread( print_time, ("Thread-2", 4, ) )
except:
   print ("Error: unable to start thread")
while 1:
   pass

输出 (Output)

以下输出将帮助我们在《_thread》模块的帮助下理解新线程的生成。

Thread-1: Mon Apr 23 10:03:33 2018
Thread-2: Mon Apr 23 10:03:35 2018
Thread-1: Mon Apr 23 10:03:35 2018
Thread-1: Mon Apr 23 10:03:37 2018
Thread-2: Mon Apr 23 10:03:39 2018
Thread-1: Mon Apr 23 10:03:39 2018
Thread-1: Mon Apr 23 10:03:41 2018
Thread-2: Mon Apr 23 10:03:43 2018
Thread-2: Mon Apr 23 10:03:47 2018
Thread-2: Mon Apr 23 10:03:51 2018

《threading》 module

《threading》模块以面向对象的方式实现,并将每个线程视为对象。 因此,它为线程提供了比“_thread”模块更强大,更高级的支持。 该模块包含在Python 2.4中。

模块中的其他方法

《threading》模块包含《_thread》模块的所有方法,但它也提供了其他方法。 其他方法如下 -

  • threading.activeCount() - 此方法返回活动的线程对象的数量

  • threading.currentThread() - 此方法返回调用者线程控件中的线程对象数。

  • threading.enumerate() - 此方法返回当前活动的所有线程对象的列表。

  • 为了实现线程, 《threading》模块具有Thread类,它提供以下方法 -

    • run() - run()方法是线程的入口点。

    • start() - start()方法通过调用run方法启动一个线程。

    • join([time]) - join()等待线程终止。

    • isAlive() - isAlive()方法检查线程是否仍在执行。

    • getName() - getName()方法返回线程的名称。

    • setName() - setName()方法设置线程的名称。

如何使用模块创建线程?

在本节中,我们将学习如何使用《threading》模块创建线程。 按照以下步骤使用“线程”模块创建新线程 -

  • Step 1 - 在这一步中,我们需要定义Thread类的新子类。

  • Step 2 - 然后,为了添加其他参数,我们需要覆盖__init__(self [,args])方法。

  • Step 3 - 在这一步中,我们需要覆盖run(self [,args])方法来实现线程在启动时应该做的事情。

  • 现在,在创建新的Thread子类之后,我们可以创建它的一个实例,然后通过调用start()启动一个新线程, start()又调用run()方法。

例子 (Example)

考虑此示例以了解如何使用《threading》模块生成新线程。

import threading
import time
exitFlag = 0
class myThread (threading.Thread):
   def __init__(self, threadID, name, counter):
      threading.Thread.__init__(self)
      self.threadID = threadID
      self.name = name
      self.counter = counter
   def run(self):
      print ("Starting " + self.name)
      print_time(self.name, self.counter, 5)
      print ("Exiting " + self.name)
def print_time(threadName, delay, counter):
   while counter:
      if exitFlag:
         threadName.exit()
      time.sleep(delay)
      print ("%s: %s" % (threadName, time.ctime(time.time())))
      counter -= 1
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print ("Exiting Main Thread")
Starting Thread-1
Starting Thread-2

输出 (Output)

现在,考虑以下输出 -

Thread-1: Mon Apr 23 10:52:09 2018
Thread-1: Mon Apr 23 10:52:10 2018
Thread-2: Mon Apr 23 10:52:10 2018
Thread-1: Mon Apr 23 10:52:11 2018
Thread-1: Mon Apr 23 10:52:12 2018
Thread-2: Mon Apr 23 10:52:12 2018
Thread-1: Mon Apr 23 10:52:13 2018
Exiting Thread-1
Thread-2: Mon Apr 23 10:52:14 2018
Thread-2: Mon Apr 23 10:52:16 2018
Thread-2: Mon Apr 23 10:52:18 2018
Exiting Thread-2
Exiting Main Thread

各种线程状态的Python程序

有五种线程状态 - 新的,可运行的,运行的,等待的和死的。 在这五个中,我们主要关注三个州 - 跑步,等待和死亡。 线程将其资源置于运行状态,等待处于等待状态的资源; 资源的最终版本,如果执行和获取处于死亡状态。

以下Python程序在start(),sleep()和join()方法的帮助下将分别显示线程如何进入running,waiting和dead状态。

Step 1 - 导入必要的模块,“线程”和“时间”

import threading
import time

Step 2 - 定义一个函数,在创建线程时将调用该函数。

def thread_states():
   print("Thread entered in running state")

Step 3 - 我们使用time模块的sleep()方法让我们的线程等待2秒钟。

time.sleep(2)

Step 4 - 现在,我们创建一个名为T1的线程,它接受上面定义的函数的参数。

T1 = threading.Thread(target=thread_states)

Step 5 - 现在,在start()函数的帮助下,我们可以启动我们的线程。 它将生成消息,该消息由我们在定义函数时设置。

T1.start()
Thread entered in running state

Step 6 - 现在,最后我们可以在完成执行后使用join()方法终止线程。

T1.join()

在Python中启动一个线程

在python中,我们可以通过不同的方式启动一个新线程,但其中最简单的一个是将它定义为单个函数。 定义函数后,我们可以将其作为新的threading.Thread对象的目标传递给它。 执行以下Python代码以了解该函数的工作原理 -

import threading
import time
import random
def Thread_execution(i):
   print("Execution of Thread {} started\n".format(i))
   sleepTime = random.randint(1,4)
   time.sleep(sleepTime)
   print("Execution of Thread {} finished".format(i))
for i in range(4):
   thread = threading.Thread(target=Thread_execution, args=(i,))
   thread.start()
   print("Active Threads:" , threading.enumerate())

输出 (Output)

Execution of Thread 0 started
Active Threads:
   [<_MainThread(MainThread, started 6040)>,
      <HistorySavingThread(IPythonHistorySavingThread, started 5968)>,
      <Thread(Thread-3576, started 3932)>]
Execution of Thread 1 started
Active Threads:
   [<_MainThread(MainThread, started 6040)>,
      <HistorySavingThread(IPythonHistorySavingThread, started 5968)>,
      <Thread(Thread-3576, started 3932)>,
      <Thread(Thread-3577, started 3080)>]
Execution of Thread 2 started
Active Threads:
   [<_MainThread(MainThread, started 6040)>,
      <HistorySavingThread(IPythonHistorySavingThread, started 5968)>,
      <Thread(Thread-3576, started 3932)>,
      <Thread(Thread-3577, started 3080)>,
      <Thread(Thread-3578, started 2268)>]
Execution of Thread 3 started
Active Threads:
   [<_MainThread(MainThread, started 6040)>,
      <HistorySavingThread(IPythonHistorySavingThread, started 5968)>,
      <Thread(Thread-3576, started 3932)>,
      <Thread(Thread-3577, started 3080)>,
      <Thread(Thread-3578, started 2268)>,
      <Thread(Thread-3579, started 4520)>]
Execution of Thread 0 finished
Execution of Thread 1 finished
Execution of Thread 2 finished
Execution of Thread 3 finished

Python中的守护程序线程

在Python中实现守护程序线程之前,我们需要了解守护程序线程及其用法。 在计算方面,守护进程是一个后台进程,它处理各种服务的请求,例如数据发送,文件传输等。如果不再需要它,它将处于休眠状态。 同样的任务也可以在非守护程序线程的帮助下完成。 但是,在这种情况下,主线程必须手动跟踪非守护程序线程。 另一方面,如果我们使用守护程序线程,那么主线程可以完全忘记这一点,它将在主线程退出时被终止。 关于守护程序线程的另一个重要观点是,我们可以选择仅将它们用于非必要任务,如果它们没有完成或在它们之间被杀死则不会影响我们。 以下是python中守护程序线程的实现 -

import threading
import time
def nondaemonThread():
   print("starting my thread")
   time.sleep(8)
   print("ending my thread")
def daemonThread():
   while True:
   print("Hello")
   time.sleep(2)
if __name__ == '__main__':
   nondaemonThread = threading.Thread(target = nondaemonThread)
   daemonThread = threading.Thread(target = daemonThread)
   daemonThread.setDaemon(True)
   daemonThread.start()
   nondaemonThread.start()

在上面的代码中,有两个函数,即》nondaemonThread()》daemonThread() 。 第一个函数打印其状态并在8秒后休眠,而deamonThread()函数每2秒无限期地打印Hello。 我们可以通过以下输出了解nondaemon和daemon线程之间的区别 -

Hello
starting my thread
Hello
Hello
Hello
Hello
ending my thread
Hello
Hello
Hello
Hello
Hello

Synchronizing Threads

线程同步可以被定义为一种方法,借助于该方法,我们可以确保两个或更多并发线程不同时访问称为临界区的程序段。 另一方面,正如我们所知,临界区是访问共享资源的程序的一部分。 因此,我们可以说同步是通过同时访问资源来确保两个或多个线程不相互连接的过程。 下图显示了四个线程同时尝试访问程序的关键部分。

同步

为了更清楚,假设有两个或更多线程试图同时在列表中添加对象。 此行为无法导致成功结束,因为它将丢弃一个或所有对象,否则将完全破坏列表的状态。 这里同步的作用是一次只有一个线程可以访问列表。

线程同步中的问题

在实现并发编程或应用同步原语时,我们可能会遇到问题。 在本节中,我们将讨论两个主要问题。 问题是 -

  • Deadlock
  • Race condition

比赛条件

这是并发编程中的主要问题之一。 对共享资源的并发访问可能导致竞争条件。 竞争条件可以定义为当两个或多个线程可以访问共享数据然后尝试同时更改其值时发生的条件。 因此,变量的值可能是不可预测的,并且取决于过程的上下文切换的定时而变化。

例子 (Example)

考虑这个例子来理解竞争条件的概念 -

Step 1 - 在这一步中,我们需要导入线程模块 -

import threading

Step 2 - 现在,定义一个全局变量,比如x,以及它的值为0 -

x = 0

Step 3 - 现在,我们需要定义increment_global()函数,它将在此全局函数x中增加1 -

def increment_global():
   global x
   x += 1

Step 4 - 在这一步中,我们将定义taskofThread()函数,该函数将调用increment_global()函数指定的次数; 对于我们的例子它是50000次 -

def taskofThread():
   for _ in range(50000):
      increment_global()

Step 5 - 现在,定义main()函数,其中创建了线程t1和t2。 两者都将在start()函数的帮助下启动,并等到他们在join()函数的帮助下完成工作。

def main():
   global x
   x = 0
   t1 = threading.Thread(target= taskofThread)
   t2 = threading.Thread(target= taskofThread)
   t1.start()
   t2.start()
   t1.join()
   t2.join()

Step 6 - 现在,我们需要给出我们想要调用main()函数的迭代次数。 在这里,我们称它为5次。

if __name__ == "__main__":
   for i in range(5):
      main()
      print("x = {1} after Iteration {0}".format(i,x))

在下面显示的输出中,我们可以看到竞争条件的影响,因为每次迭代后x的值预计为100000.但是,值有很多变化。 这是由于线程并发访问共享全局变量x。

输出 (Output)

x = 100000 after Iteration 0
x = 54034 after Iteration 1
x = 80230 after Iteration 2
x = 93602 after Iteration 3
x = 93289 after Iteration 4

使用锁处理竞争条件

正如我们在上面的程序中看到竞争条件的影响,我们需要一个同步工具,它可以处理多个线程之间的竞争条件。 在Python中, 《threading》模块提供Lock类来处理竞争条件。 此外, Lock类提供了不同的方法,我们可以帮助处理多个线程之间的竞争条件。 方法如下所述 -

acquire() method

该方法用于获取,即阻止锁定。 锁定可以是阻塞或非阻塞,具体取决于以下真值或假值 -

  • With value set to True - 如果使用True(默认参数)调用acquire()方法,则会阻止线程执行,直到解锁为止。

  • With value set to False - 如果使用False调用acquire()方法(这不是默认参数),则在将其设置为true(即直到锁定)之前,不会阻止线程执行。

release() method

此方法用于释放锁定。 以下是与此方法相关的一些重要任务 -

  • 如果锁被锁定,那么release()方法将解锁它。 它的工作是,如果多个线程被阻塞并等待锁被解锁,则只允许一个线程继续进行。

  • 如果锁已经解锁,它将引发ThreadError

现在,我们可以使用lock类及其方法重写上述程序,以避免竞争条件。 我们需要使用lock参数定义taskofThread()方法,然后需要使用acquire()和release()方法来阻止和非阻塞锁以避免竞争条件。

例子 (Example)

以下是了解用于处理竞争条件的锁概念的python程序示例 -

import threading
x = 0
def increment_global():
   global x
   x += 1
def taskofThread(lock):
   for _ in range(50000):
      lock.acquire()
      increment_global()
      lock.release()
def main():
   global x
   x = 0
   lock = threading.Lock()
   t1 = threading.Thread(target = taskofThread, args = (lock,))
   t2 = threading.Thread(target = taskofThread, args = (lock,))
   t1.start()
   t2.start()
   t1.join()
   t2.join()
if __name__ == "__main__":
   for i in range(5):
      main()
      print("x = {1} after Iteration {0}".format(i,x))

以下输出表明忽略了竞争条件的影响; 因为x的值,在每次迭代之后,现在是100000,这符合该程序的期望。

输出 (Output)

x = 100000 after Iteration 0
x = 100000 after Iteration 1
x = 100000 after Iteration 2
x = 100000 after Iteration 3
x = 100000 after Iteration 4

僵局 - 餐饮哲学家的问题

死锁是设计并发系统时可能遇到的麻烦问题。 我们可以借助餐饮哲学家的问题来说明这个问题如下 -

Edsger Dijkstra最初介绍了餐饮哲学家的问题,其中一个着名的插图是并发系统中最大的问题之一,称为死锁。

在这个问题上,有五位着名的哲学家坐在圆桌旁,从他们的碗里吃一些食物。 五个哲学家可以使用五把叉来吃他们的食物。 然而,哲学家们决定同时使用两把叉来吃他们的食物。

现在,哲学家有两个主要条件。 首先,每个哲学家可以处于进食状态或处于思维状态,其次,他们必须首先获得两个分叉,即左右分支。 当五位哲学家中的每一位都设法同时选择左叉时,问题就出现了。 现在他们都在等待正确的叉子自由,但他们永远不会放弃他们的叉子,直到他们吃了他们的食物并且正确的叉子永远不可用。 因此,餐桌上会出现死锁状态。

并发系统中的死锁

现在,如果我们看到,我们的并发系统也会出现同样的问题。 上例中的分支将是系统资源,每个哲学家都可以代表竞争获取资源的过程。

使用Python程序解决方案

通过将哲学家分为两类 - greedy philosophersgenerous philosophers可以找到这个问题的解决方案。 主要是一个贪婪的哲学家将尝试拿起左叉并等到它在那里。 然后他会等待正确的叉子在那里,捡起来吃,然后把它放下。 另一方面,一个慷慨的哲学家会试图拿起左叉,如果不存在,他会等一段时间再试一次。 如果他们得到左叉,那么他们会尝试找到合适的叉子。 如果他们也会获得正确的叉子,那么他们就会吃掉并释放两个叉子。 但是,如果他们不能获得正确的分叉,那么他们将释放左分叉。

例子 (Example)

以下Python程序将帮助我们找到餐饮哲学家问题的解决方案 -

import threading
import random
import time
class DiningPhilosopher(threading.Thread):
   running = True
   def __init__(self, xname, Leftfork, Rightfork):
   threading.Thread.__init__(self)
   self.name = xname
   self.Leftfork = Leftfork
   self.Rightfork = Rightfork
   def run(self):
   while(self.running):
      time.sleep( random.uniform(3,13))
      print ('%s is hungry.' % self.name)
      self.dine()
   def dine(self):
   fork1, fork2 = self.Leftfork, self.Rightfork
   while self.running:
      fork1.acquire(True)
      locked = fork2.acquire(False)
	  if locked: break
      fork1.release()
      print ('%s swaps forks' % self.name)
      fork1, fork2 = fork2, fork1
   else:
      return
   self.dining()
   fork2.release()
   fork1.release()
   def dining(self):
   print ('%s starts eating '% self.name)
   time.sleep(random.uniform(1,10))
   print ('%s finishes eating and now thinking.' % self.name)
def Dining_Philosophers():
   forks = [threading.Lock() for n in range(5)]
   philosopherNames = ('1st','2nd','3rd','4th', '5th')
   philosophers= [DiningPhilosopher(philosopherNames[i], forks[i%5], forks[(i+1)%5]) \
      for i in range(5)]
   random.seed()
   DiningPhilosopher.running = True
   for p in philosophers: p.start()
   time.sleep(30)
   DiningPhilosopher.running = False
   print (" It is finishing.")
Dining_Philosophers()

上述计划使用了贪婪和慷慨的哲学家的概念。 该程序还使用了《threading》模块的Lock类的acquire()release()方法。 我们可以在以下输出中看到解决方案 -

输出 (Output)

4th is hungry.
4th starts eating
1st is hungry.
1st starts eating
2nd is hungry.
5th is hungry.
3rd is hungry.
1st finishes eating and now thinking.3rd swaps forks
2nd starts eating
4th finishes eating and now thinking.
3rd swaps forks5th starts eating
5th finishes eating and now thinking.
4th is hungry.
4th starts eating
2nd finishes eating and now thinking.
3rd swaps forks
1st is hungry.
1st starts eating
4th finishes eating and now thinking.
3rd starts eating
5th is hungry.
5th swaps forks
1st finishes eating and now thinking.
5th starts eating
2nd is hungry.
2nd swaps forks
4th is hungry.
5th finishes eating and now thinking.
3rd finishes eating and now thinking.
2nd starts eating 4th starts eating
It is finishing.

Threads Intercommunication

在现实生活中,如果一个团队正在开展一项共同任务,那么他们之间应该进行沟通以正确完成任务。 同样的类比也适用于线程。 在编程中,为了减少处理器的理想时间,我们创建多个线程并为每个线程分配不同的子任务。 因此,必须有一个通信设施,他们应该相互交互,以同步的方式完成工作。

考虑以下与线程互通有关的要点 -

  • No performance gain - 如果我们无法在线程和进程之间实现正确的通信,那么并发性和并行性带来的性能提升是没有用的。

  • Accomplish task properly - 如果线程之间没有适当的相互通信机制,则无法正确完成分配的任务。

  • More efficient than inter-process communication间通信更高效,更易于使用,因为进程内的所有线程共享相同的地址空间,并且不需要使用共享内存。

用于线程安全通信的Python数据结构

多线程代码会出现将信息从一个线程传递到另一个线程的问题。 标准通信原语无法解决此问题。 因此,我们需要实现自己的复合对象,以便在线程之间共享对象,以使通信成为线程安全的。 以下是一些数据结构,它们在对它们进行一些更改后提供了线程安全的通信 -

Sets

为了以线程安全的方式使用set数据结构,我们需要扩展set类来实现我们自己的锁定机制。

例子 (Example)

这是一个扩展类的Python示例 -

class extend_class(set):
   def __init__(self, *args, **kwargs):
      self._lock = Lock()
      super(extend_class, self).__init__(*args, **kwargs)
   def add(self, elem):
      self._lock.acquire()
	  try:
      super(extend_class, self).add(elem)
      finally:
      self._lock.release()
   def delete(self, elem):
      self._lock.acquire()
      try:
      super(extend_class, self).delete(elem)
      finally:
      self._lock.release()

在上面的示例中,定义了一个名为extend_class的类对象,该对象继承自Python set class 。 在此类的构造函数中创建一个锁对象。 现在,有两个函数 - add()delete() 。 这些函数是定义的并且是线程安全的。 它们都依赖于super类功能和一个关键异常。

装饰员(Decorator)

这是线程安全通信的另一个关键方法,就是使用装饰器。

例子 (Example)

考虑一个Python示例,演示如何使用装饰器和mminus;

def lock_decorator(method):
   def new_deco_method(self, *args, **kwargs):
      with self._lock:
         return method(self, *args, **kwargs)
return new_deco_method
class Decorator_class(set):
   def __init__(self, *args, **kwargs):
      self._lock = Lock()
      super(Decorator_class, self).__init__(*args, **kwargs)
   @lock_decorator
   def add(self, *args, **kwargs):
      return super(Decorator_class, self).add(elem)
   @lock_decorator
   def delete(self, *args, **kwargs):
      return super(Decorator_class, self).delete(elem)

在上面的示例中,定义了一个名为lock_decorator的装饰器方法,该方法继承自Python方法类。 然后在此类的构造函数中创建一个锁对象。 现在,有两个函数 - add()和delete()。 这些函数是定义的并且是线程安全的。 它们都依赖于超类功能和一个关键异常。

Lists

列表数据结构是线程安全的,快速的以及用于临时内存存储的简单结构。 在Cpython中,GIL可以防止对它们的并发访问。 我们开始知道列表是线程安全的但是它们中的数据呢。 实际上,列表的数据不受保护。 例如,如果另一个线程试图做同样的事情,则L.append(x)不保证返回预期的结果。 这是因为虽然append()是一个原子操作并且是线程安全的,但另一个线程试图以并发方式修改列表的数据,因此我们可以看到竞争条件对输出的副作用。

要解决此类问题并安全地修改数据,我们必须实现适当的锁定机制,这进一步确保多个线程不会潜在地遇到竞争条件。 为了实现正确的锁定机制,我们可以像前面的例子中那样扩展类。

列表上的其他一些原子操作如下 -

L.append(x)
L1.extend(L2)
x = L[i]
x = L.pop()
L1[i:j] = L2
L.sort()
x = y
x.field = y
D[x] = y
D1.update(D2)
D.keys()

在这里 -

  • L,L1,L2 all are lists
  • D,D1,D2 are dicts
  • x,y are objects
  • i, j are ints

Queues

如果列表的数据不受保护,我们可能不得不面对后果。 我们可能会获取或删除错误的数据项,竞争条件。 这就是为什么建议使用队列数据结构。 一个真实的队列示例可以是单车道单向道路,车辆首先进入,首先退出。 可以在售票窗口和公共汽车站看到更多真实世界的例子。

队列

队列是默认的,线程安全的数据结构,我们不必担心实现复杂的锁定机制。 Python为我们提供了 模块在我们的应用程序中使用不同类型的队列。

队列类型

在本节中,我们将了解不同类型的队列。 Python提供了三个队列选项,可以从《queue》模块中使用 -

  • 正常队列(FIFO,先进先出)
  • LIFO,后进先出
  • Priority

我们将在后续章节中了解不同的队列。

正常队列(FIFO,先进先出)

它是Python提供的最常用的队列实现。 在这种排队机制中,无论谁先到,都会先获得服务。 FIFO也称为普通队列。 FIFO队列可以表示如下 -

FIFO

FIFO队列的Python实现

在python中,FIFO队列可以用单线程和多线程实现。

单线程FIFO队列

为了使用单线程实现FIFO队列, Queue类将实现一个基本的先进先出容器。 元素将使用put()添加到序列的一个“结尾”,并使用get()从另一端删除。

例子 (Example)

以下是用于实现单线程FIFO队列的Python程序 -

import queue
q = queue.Queue()
for i in range(8):
   q.put("item-" + str(i))
while not q.empty():
   print (q.get(), end = " ")

输出 (Output)

item-0 item-1 item-2 item-3 item-4 item-5 item-6 item-7

输出显示上面的程序使用单个线程来说明元素是按照它们插入的顺序从队列中删除的。

具有多个线程的FIFO队列

为了实现具有多个线程的FIFO,我们需要定义myqueue()函数,该函数从队列模块扩展。 在使用单线程实现FIFO队列时,get()和put()方法的工作与上面讨论的相同。 然后为了使它成为多线程,我们需要声明并实例化线程。 这些线程将以FIFO方式使用队列。

例子 (Example)

以下是用于实现具有多个线程的FIFO队列的Python程序

import threading
import queue
import random
import time
def myqueue(queue):
   while not queue.empty():
   item = queue.get()
   if item is None:
   break
   print("{} removed {} from the queue".format(threading.current_thread(), item))
   queue.task_done()
   time.sleep(2)
q = queue.Queue()
for i in range(5):
   q.put(i)
threads = []
for i in range(4):
   thread = threading.Thread(target=myqueue, args=(q,))
   thread.start()
   threads.append(thread)
for thread in threads:
   thread.join()

输出 (Output)

<Thread(Thread-3654, started 5044)> removed 0 from the queue
<Thread(Thread-3655, started 3144)> removed 1 from the queue
<Thread(Thread-3656, started 6996)> removed 2 from the queue
<Thread(Thread-3657, started 2672)> removed 3 from the queue
<Thread(Thread-3654, started 5044)> removed 4 from the queue

LIFO,后进先出队列

该队列使用与FIFO(先进先出)队列完全相反的类比。 在这个排队机制中,最后一个,将首先获得服务。 这类似于实现堆栈数据结构。 LIFO队列在实现深度优先搜索(如人工智能算法)时非常有用。

Python实现LIFO队列

在python中,LIFO队列可以用单线程和多线程实现。

单线程的LIFO队列

为了使用单线程实现LIFO队列, Queue类将使用结构Queue .LifoQueue实现基本的后进先出容器。 现在,在调用put() ,元素将添加到容器的头部,并在使用get()从头部移除。

例子 (Example)

以下是用于使用单线程实现LIFO队列的Python程序 -

import queue
q = queue.LifoQueue()
for i in range(8):
   q.put("item-" + str(i))
while not q.empty():
   print (q.get(), end=" ")
Output:
item-7 item-6 item-5 item-4 item-3 item-2 item-1 item-0

输出显示上述程序使用单个线程来说明元素是按照它们插入的相反顺序从队列中删除的。

具有多个线程的LIFO队列

实现类似于我们已经完成了具有多个线程的FIFO队列的实现。 唯一的区别是我们需要使用Queue类,它将使用结构Queue.LifoQueue实现基本的Queue.LifoQueue先出容器。

例子 (Example)

以下是用于实现具有多个线程的LIFO队列的Python程序 -

import threading
import queue
import random
import time
def myqueue(queue):
   while not queue.empty():
      item = queue.get()
      if item is None:
      break
	  print("{} removed {} from the queue".format(threading.current_thread(), item))
      queue.task_done()
      time.sleep(2)
q = queue.LifoQueue()
for i in range(5):
   q.put(i)
threads = []
for i in range(4):
   thread = threading.Thread(target=myqueue, args=(q,))
   thread.start()
   threads.append(thread)
for thread in threads:
   thread.join() 

输出 (Output)

<Thread(Thread-3882, started 4928)> removed 4 from the queue
<Thread(Thread-3883, started 4364)> removed 3 from the queue
<Thread(Thread-3884, started 6908)> removed 2 from the queue
<Thread(Thread-3885, started 3584)> removed 1 from the queue
<Thread(Thread-3882, started 4928)> removed 0 from the queue

优先队列

在FIFO和LIFO队列中,项目的顺序与插入顺序有关。 但是,在许多情况下,优先级比插入顺序更重要。 让我们考虑一个现实世界的例子。 假设机场的安检正在检查不同类别的人。 可以优先检查VVIP的人员,航空公司的工作人员,海关官员,类别,而不是像对待普通人那样在到达的基础上进行检查。

优先级队列需要考虑的另一个重要方面是如何开发任务调度程序。 一种常见的设计是在队列中优先处理大多数代理任务。 此数据结构可用于根据队列的优先级值从队列中获取项目。

Python优先级队列的实现

在python中,优先级队列可以用单线程和多线程实现。

单线程的优先级队列

为了实现具有单线程的优先级队列, Queue类将使用结构Queue .PriorityQueue在优先级容器上实现任务。 现在,在调用put() ,元素将添加一个值,其中最低值将具有最高优先级,因此首先使用get()检索。

例子 (Example)

考虑使用以下Python程序实现具有单线程的优先级队列 -

import queue as Q
p_queue = Q.PriorityQueue()
p_queue.put((2, 'Urgent'))
p_queue.put((1, 'Most Urgent'))
p_queue.put((10, 'Nothing important'))
prio_queue.put((5, 'Important'))
while not p_queue.empty():
   item = p_queue.get()
   print('%s - %s' % item)

输出 (Output)

1 – Most Urgent
2 - Urgent
5 - Important
10 – Nothing important

在上面的输出中,我们可以看到队列已经根据优先级存储了项目 - 较少的值具有高优先级。

具有多线程的优先级队列

该实现类似于具有多个线程的FIFO和LIFO队列的实现。 唯一的区别是我们需要使用Queue类来使用结构Queue.PriorityQueue来初始化优先级。 另一个区别在于生成队列的方式。 在下面给出的示例中,将使用两个相同的数据集生成它。

例子 (Example)

以下Python程序有助于实现具有多个线程的优先级队列 -

import threading
import queue
import random
import time
def myqueue(queue):
   while not queue.empty():
      item = queue.get()
      if item is None:
      break
      print("{} removed {} from the queue".format(threading.current_thread(), item))
      queue.task_done()
      time.sleep(1)
q = queue.PriorityQueue()
for i in range(5):
   q.put(i,1)
for i in range(5):
   q.put(i,1)
threads = []
for i in range(2):
   thread = threading.Thread(target=myqueue, args=(q,))
   thread.start()
   threads.append(thread)
for thread in threads:
   thread.join()

输出 (Output)

<Thread(Thread-4939, started 2420)> removed 0 from the queue
<Thread(Thread-4940, started 3284)> removed 0 from the queue
<Thread(Thread-4939, started 2420)> removed 1 from the queue
<Thread(Thread-4940, started 3284)> removed 1 from the queue
<Thread(Thread-4939, started 2420)> removed 2 from the queue
<Thread(Thread-4940, started 3284)> removed 2 from the queue
<Thread(Thread-4939, started 2420)> removed 3 from the queue
<Thread(Thread-4940, started 3284)> removed 3 from the queue
<Thread(Thread-4939, started 2420)> removed 4 from the queue
<Thread(Thread-4940, started 3284)> removed 4 from the queue

Testing Thread Applications

在本章中,我们将学习线程应用程序的测试。 我们还将了解测试的重要性。

为什么要测试?

在我们深入讨论测试的重要性之前,我们需要知道测试的内容。 一般来说,测试是一种了解某些东西是如何运作的技术。 另一方面,特别是如果我们谈论计算机程序或软件,那么测试就是访问软件程序功能的技术。

在本节中,我们将讨论软件测试的重要性。 在软件开发中,必须在向客户端发布软件之前进行双重检查。 这就是由经验丰富的测试团队测试软件非常重要的原因。 请考虑以下几点来理解软件测试的重要性 -

提高软件质量

当然,没有公司想要提供低质量的软件,也没有客户想购买低质量的软件。 测试通过查找和修复其中的错误来提高软件的质量。

客户满意度

任何业务中最重要的部分是客户的满意度。 通过提供无错误和优质软件,公司可以实现客户满意度。

减轻新功能的影响

假设我们已经建立了一个10000行的软件系统,我们需要添加一个新功能,然后开发团队就会担心这个新功能对整个软件的影响。 在这里,测试起着至关重要的作用,因为如果测试团队已经完成了一系列测试,那么它可以使我们免于任何潜在的灾难性中断。

用户体验

任何业务中另一个最重要的部分是该产品用户的体验。 只有测试可以确保最终用户发现它简单易用。

减少开支

测试可以通过在开发的测试阶段找到并修复错误而不是在交付后修复它来降低软件的总成本。 如果在交付软件后出现重大错误,那么就客户不满意,公司负面声誉等方面的费用和无形成本而言,会增加其实际成本。

测试什么?

始终建议您对要测试的内容有适当的了解。 在本节中,我们将首先了解测试任何软件时测试人员的主要动机。 代码覆盖率,即我们的测试套件在测试时会遇到多少行代码,应该避免。 这是因为,在测试时,仅关注代码行数对我们的系统没有任何实际价值。 可能仍然存在一些错误,即使在部署之后,这些错误也会在稍后阶段反映出来。

考虑以下与测试内容相关的重要观点 -

  • 我们需要专注于测试代码的功能而不是代码覆盖率。

  • 我们需要首先测试代码中最重要的部分,然后转向代码中不太重要的部分。 这肯定会节省时间。

  • 测试人员必须具有多种不同的测试,可以将软件推向极限。

测试并发软件程序的方法

由于能够利用多核架构的真正能力,并发软件系统正在取代顺序系统。 最近,并发系统程序被用于从手机到洗衣机,从汽车到飞机等各种各样的系统程序。我们需要更加小心地测试并发软件程序,因为如果我们已经为单线程应用程序添加了多个线程已经是一个错误,那么我们最终会遇到多个错误。

并发软件程序的测试技术广泛关注于选择交错,这种交错暴露了可能有害的模式,如竞争条件,死锁和违反原子性。 以下是测试并发软件程序的两种方法 -

系统探索

该方法旨在尽可能广泛地探索交错的空间。 这种方法可以采用蛮力技术,其他方法采用偏序降阶技术或启发式技术来探索交织空间。

Property-driven

属性驱动的方法依赖于观察到并发错误更可能发生在暴露特定属性(例如可疑内存访问模式)的交错下。 不同的属性驱动方法针对不同的错误,如竞争条件,死锁和违反原子性,这进一步取决于一个或其他特定属性。

测试策略

测试策略也称为测试方法。 该策略定义了如何进行测试。 测试方法有两种技术 -

主动(Proactive)

一种方法,尽可能早地启动测试设计过程,以便在创建构建之前找到并修复缺陷。

Reactive

一种方法,在开发过程完成之前,测试不会开始。

在对python程序应用任何测试策略或方法之前,我们必须对软件程序可能具有的错误类型有一个基本的想法。 错误如下 -

语法错误

在程序开发期间,可能会出现许多小错误。 错误主要是由于输入错误。 例如,缺少冒号或关键字的拼写错误等。此类错误是由于程序语法错误而非逻辑错误造成的。 因此,这些错误称为语法错误。

语义错误

语义错误也称为逻辑错误。 如果软件程序中存在逻辑或语义错误,则语句将正确编译并运行,但由于逻辑不正确,因此无法提供所需的输出。

单元测试 (Unit Testing)

这是测试python程序最常用的测试策略之一。 此策略用于测试代码的单元或组件。 按单位或组件,我们指的是代码的类或函数。 单元测试通过测试“小”单元简化了大型编程系统的测试。 借助于上述概念,单元测试可以被定义为一种方法,其中测试各个源代码单元以确定它们是否返回所需的输出。

在随后的章节中,我们将了解用于单元测试的不同Python模块。

unittest模块

单元测试的第一个模块是unittest模块。 它受JUnit的启发,默认情况下包含在Python3.6中。 它支持测试自动化,共享测试的设置和关闭代码,将测试聚合到集合中,以及测试与报告框架的独立性。

以下是unittest模块支持的一些重要概念

文字夹具

它用于设置测试,以便在开始测试之前运行并在测试结束后拆除。 它可能涉及在开始测试之前创建临时数据库,目录等。

测试用例

测试用例检查所需的响应是否来自特定的输入集。 unittest模块包含一个名为TestCase的基类,可用于创建新的测试用例。 它包含两个默认方法 -

  • setUp() - 一种用于在运行之前设置测试夹具的钩子方法。 在调用实现的测试方法之前调用它。

  • tearDown( - 用于在类中运行所有测试后解构类夹具的钩子方法。

测试套件

它是测试套件,测试用例或两者的集合。

试验跑步者

它控制测试用例或套装的运行,并为用户提供结果。 它可以使用GUI或简单的文本界面来提供结果。

Example

以下Python程序使用unittest模块来测试名为Fibonacci的模块。 该程序有助于计算一个数字的斐波那契数列。 在此示例中,我们创建了一个名为Fibo_test的类,以使用不同的方法定义测试用例。 这些方法继承自unittest.TestCase。 我们使用两种默认方法--setUp()和tearDown()。 我们还定义了testfibocal方法。 必须使用字母测试开始测试的名称。 在最后一个块中,unittest.main()为测试脚本提供了一个命令行界面。

import unittest
def fibonacci(n):
   a, b = 0, 1
   for i in range(n):
   a, b = b, a + b
   return a
class Fibo_Test(unittest.TestCase):
   def setUp(self):
   print("This is run before our tests would be executed")
   def tearDown(self):
   print("This is run after the completion of execution of our tests")
   def testfibocal(self):
   self.assertEqual(fib(0), 0)
   self.assertEqual(fib(1), 1)
   self.assertEqual(fib(5), 5)
   self.assertEqual(fib(10), 55)
   self.assertEqual(fib(20), 6765)
if __name__ == "__main__":
   unittest.main()

从命令行运行时,上面的脚本会生成一个如下所示的输出 -

输出 (Output)

This runs before our tests would be executed.
This runs after the completion of execution of our tests.
.
----------------------------------------------------------------------
Ran 1 test in 0.006s
OK

现在,为了更清楚,我们正在改变我们的代码,这有助于定义Fibonacci模块。

以下面的代码块为例 -

def fibonacci(n):
   a, b = 0, 1
   for i in range(n):
   a, b = b, a + b
   return a

对代码块进行了一些更改,如下所示 -

def fibonacci(n):
   a, b = 1, 1
   for i in range(n):
   a, b = b, a + b
   return a

现在,在使用更改的代码运行脚本后,我们将获得以下输出 -

This runs before our tests would be executed.
This runs after the completion of execution of our tests.
F
======================================================================
FAIL: testCalculation (__main__.Fibo_Test)
----------------------------------------------------------------------
Traceback (most recent call last):
File "unitg.py", line 15, in testCalculation
self.assertEqual(fib(0), 0)
AssertionError: 1 != 0
----------------------------------------------------------------------
Ran 1 test in 0.007s
FAILED (failures = 1)

上面的输出显示模块未能提供所需的输出。

Docktest模块

docktest模块还有助于单元测试。 它还预先包装了python。 它比unittest模块更容易使用。 unittest模块更适合复杂的测试。 要使用doctest模块,我们需要导入它。 相应函数的docstring必须具有交互式python会话及其输出。

如果我们的代码中的一切都很好,那么docktest模块就没有输出; 否则,它将提供输出。

例子 (Example)

以下Python示例使用docktest模块测试名为Fibonacci的模块,该模块有助于计算数字的Fibonacci系列。

import doctest
def fibonacci(n):
   """
   Calculates the Fibonacci number
   >>> fibonacci(0)
   0
   >>> fibonacci(1)
   1
   >>> fibonacci(10)
   55
   >>> fibonacci(20)
   6765
   >>>
   """
   a, b = 1, 1
   for i in range(n):
   a, b = b, a + b
   return a
      if __name__ == "__main__":
   doctest.testmod()

我们可以看到名为fib的相应函数的docstring具有交互式python会话以及输出。 如果我们的代码没问题,那么doctest模块就没有输出。 但要了解它是如何工作的,我们可以使用-v选项运行它。

(base) D:\ProgramData>python dock_test.py -v
Trying:
   fibonacci(0)
Expecting:
   0
ok
Trying:
   fibonacci(1)
Expecting:
   1
ok
Trying:
   fibonacci(10)
Expecting:
   55
ok
Trying:
   fibonacci(20)
Expecting:
   6765
ok
1 items had no tests:
   __main__
1 items passed all tests:
4 tests in __main__.fibonacci
4 tests in 2 items.
4 passed and 0 failed.
Test passed.

现在,我们将更改有助于定义Fibonacci模块的代码

以下面的代码块为例 -

def fibonacci(n):
   a, b = 0, 1
   for i in range(n):
   a, b = b, a + b
   return a

以下代码块有助于更改 -

def fibonacci(n):
   a, b = 1, 1
   for i in range(n):
   a, b = b, a + b
   return a

在没有-v选项的情况下运行脚本后,使用更改的代码,我们将获得如下所示的输出。

输出 (Output)

(base) D:\ProgramData>python dock_test.py
**********************************************************************
File "unitg.py", line 6, in __main__.fibonacci
Failed example:
   fibonacci(0)
Expected:
   0
Got:
   1
**********************************************************************
File "unitg.py", line 10, in __main__.fibonacci
Failed example:
   fibonacci(10)
Expected:
   55
Got:
   89
**********************************************************************
File "unitg.py", line 12, in __main__.fibonacci
Failed example:
   fibonacci(20)
Expected:
   6765
Got:
   10946
**********************************************************************
1 items had failures:
   3 of 4 in __main__.fibonacci
***Test Failed*** 3 failures.

我们可以在上面的输出中看到三个测试都失败了。

Debugging Thread Applications

在本章中,我们将学习如何调试线程应用程序。 我们还将了解调试的重要性。

什么是调试?

在计算机编程中,调试是从计算机程序中查找和删除错误,错误和异常的过程。 一旦代码被编写就开始该过程,并且随着代码与其他编程单元组合以形成软件产品,该过程在连续阶段中继续。 调试是软件测试过程的一部分,是整个软件开发生命周期中不可或缺的一部分。

Python Debugger

Python调试器或pdb是Python标准库的一部分。 它是一个很好的后备工具,用于跟踪难以发现的错误,并允许我们快速可靠地修复错误代码。 以下是pdp调试器的两个最重要的任务 -

  • 它允许我们在运行时检查变量的值。
  • 我们也可以单步执行代码并设置断点。

我们可以通过以下两种方式使用pdb -

  • 通过命令行; 这也称为事后调试。
  • By interactively running pdb.

使用pdb

要使用Python调试器,我们需要在我们想要进入调试器的位置使用以下代码 -

import pdb;
pdb.set_trace()

请考虑以下命令以通过命令行使用pdb。

  • h(help)
  • d(down)
  • u(up)
  • b(break)
  • cl(clear)
  • l(list))
  • n(next))
  • c(continue)
  • s(step)
  • r(return))
  • b(break)

以下是Python调试器的h(help)命令的演示 -

import pdb
pdb.set_trace()
--Call--
>d:\programdata\lib\site-packages\ipython\core\displayhook.py(247)__call__()
-> def __call__(self, result = None):
(Pdb) h
Documented commands (type help <topic>):
========================================
EOF   c         d       h        list     q       rv      undisplay
a     cl        debug   help     ll       quit    s       unt
alias clear     disable ignore   longlist r       source  until
args  commands  display interact n        restart step    up
b     condition down    j        next     return  tbreak  w
break cont      enable  jump     p        retval  u       whatis
bt    continue  exit    l        pp       run     unalias where
Miscellaneous help topics:
==========================
exec pdb

例子 (Example)

在使用Python调试器时,我们可以使用以下行在脚本中的任何位置设置断点 -

import pdb;
pdb.set_trace()

设置断点后,我们可以正常运行脚本。 脚本将执行到某一点; 直到设置了一条线。 考虑以下示例,我们将在脚本中的不同位置使用上述行来运行脚本 -

import pdb;
a = "aaa"
pdb.set_trace()
b = "bbb"
c = "ccc"
final = a + b + c
print (final)

当运行上面的脚本时,它将执行程序直到a =“aaa”,我们可以在下面的输出中检查它。

输出 (Output)

--Return--
> <ipython-input-7-8a7d1b5cc854>(3)<module>()->None
-> pdb.set_trace()
(Pdb) p a
'aaa'
(Pdb) p b
*** NameError: name 'b' is not defined
(Pdb) p c
*** NameError: name 'c' is not defined

在pdb中使用命令'p(print)'后,此脚本仅打印'aaa'。 之后是错误,因为我们已将断点设置为a =“aaa”。

同样,我们可以通过更改断点来运行脚本并查看输出中的差异 -

import pdb
a = "aaa"
b = "bbb"
c = "ccc"
pdb.set_trace()
final = a + b + c
print (final)

输出 (Output)

--Return--
> <ipython-input-9-a59ef5caf723>(5)<module>()->None
-> pdb.set_trace()
(Pdb) p a
'aaa'
(Pdb) p b
'bbb'
(Pdb) p c
'ccc'
(Pdb) p final
*** NameError: name 'final' is not defined
(Pdb) exit

在下面的脚本中,我们在程序的最后一行设置断点 -

import pdb
a = "aaa"
b = "bbb"
c = "ccc"
final = a + b + c
pdb.set_trace()
print (final)

输出如下 -

--Return--
> <ipython-input-11-8019b029997d>(6)<module>()->None
-> pdb.set_trace()
(Pdb) p a
'aaa'
(Pdb) p b
'bbb'
(Pdb) p c
'ccc'
(Pdb) p final
'aaabbbccc'
(Pdb)

Benchmarking and Profiling

在本章中,我们将学习基准测试和分析如何帮助解决性能问题。

假设我们编写了一个代码并且它也提供了所需的结果,但是如果我们想要更快地运行此代码,因为需求已经改变了。 在这种情况下,我们需要找出代码的哪些部分正在减慢整个程序的速度。 在这种情况下,基准测试和分析可能很有用。

什么是基准测试?

基准测试旨在通过​​与标准进行比较来评估某些内容。 然而,这里出现的问题是什么是基准测试以及为什么在软件编程的情况下我们需要它。 对代码进行基准测试意味着代码的执行速度和瓶颈所在。 基准测试的一个主要原因是它优化了代码。

基准测试如何运作?

如果我们谈论基准测试的工作,我们需要首先将整个程序作为一个当前状态进行基准测试然后我们可以结合微基准测试然后将程序分解为更小的程序。 为了找到我们计划中的瓶颈并进行优化。 换句话说,我们可以将它理解为将大而难的问题分解为一系列更小且更容易优化它们的问题。

Python module for benchmarking

在Python中,我们有一个默认的基准测试模块,称为timeit 。 在timeit模块的帮助下,我们可以在主程序中测量一小部分Python代码的性能。

例子 (Example)

在下面的Python脚本中,我们将导入timeit模块,该模块进一步测量执行两个函数所需的时间 - functionAfunctionB -

import timeit
import time
def functionA():
   print("Function A starts the execution:")
   print("Function A completes the execution:")
def functionB():
   print("Function B starts the execution")
   print("Function B completes the execution")
start_time = timeit.default_timer()
functionA()
print(timeit.default_timer() - start_time)
start_time = timeit.default_timer()
functionB()
print(timeit.default_timer() - start_time)

运行上面的脚本后,我们将得到两个函数的执行时间,如下所示。

输出 (Output)

Function A starts the execution:
Function A completes the execution:
0.0014599495514175942
Function B starts the execution
Function B completes the execution
0.0017024724827479076

使用装饰器功能编写我们自己的计时器

在Python中,我们可以创建自己的计时器,它就像timeit模块一样。 它可以在decorator功能的帮助下完成。 以下是自定义计时器的示例 -

import random
import time
def timer_func(func):
   def function_timer(*args, **kwargs):
   start = time.time()
   value = func(*args, **kwargs)
   end = time.time()
   runtime = end - start
   msg = "{func} took {time} seconds to complete its execution."
      print(msg.format(func = func.__name__,time = runtime))
   return value
   return function_timer
@timer_func
def Myfunction():
   for x in range(5):
   sleep_time = random.choice(range(1,3))
   time.sleep(sleep_time)
if __name__ == '__main__':
   Myfunction()

上面的python脚本有助于导入随机时间模块。 我们创建了timer_func()装饰器函数。 这里面有function_timer()函数。 现在,嵌套函数将在调用传入函数之前获取时间。 然后它等待函数返回并获取结束时间。 这样,我们终于可以让python脚本打印执行时间了。 该脚本将生成输出,如下所示。

输出 (Output)

Myfunction took 8.000457763671875 seconds to complete its execution.

什么是剖析?

有时程序员想要测量一些属性,如使用内存,时间复杂度或使用有关程序的特定指令来测量程序的实际能力。 这种关于程序的测量称为剖析。 分析使用动态程序分析来进行此类测量。

在随后的部分中,我们将了解用于分析的不同Python模块。

cProfile - 内置模块

cProfile是一个用于分析的Python内置模块。 该模块是一个C扩展,具有合理的开销,使其适用于分析长时间运行的程序。 运行后,它会记录所有功能和执行时间。 它非常强大,但有时难以解释和采取行动。 在以下示例中,我们在下面的代码中使用cProfile -

例子 (Example)

def increment_global():
   global x
   x += 1
def taskofThread(lock):
   for _ in range(50000):
   lock.acquire()
   increment_global()
   lock.release()
def main():
   global x
   x = 0
   lock = threading.Lock()
   t1 = threading.Thread(target=taskofThread, args=(lock,))
   t2 = threading.Thread(target= taskofThread, args=(lock,))
   t1.start()
   t2.start()
   t1.join()
   t2.join()
if __name__ == "__main__":
   for i in range(5):
      main()
   print("x = {1} after Iteration {0}".format(i,x))

上面的代码保存在thread_increment.py文件中。 现在,在命令行上使用cProfile执行代码,如下所示 -

(base) D:\ProgramData>python -m cProfile thread_increment.py
x = 100000 after Iteration 0
x = 100000 after Iteration 1
x = 100000 after Iteration 2
x = 100000 after Iteration 3
x = 100000 after Iteration 4
      3577 function calls (3522 primitive calls) in 1.688 seconds
   Ordered by: standard name
   ncalls tottime percall cumtime percall filename:lineno(function)
   5 0.000 0.000 0.000 0.000 <frozen importlib._bootstrap>:103(release)
   5 0.000 0.000 0.000 0.000 <frozen importlib._bootstrap>:143(__init__)
   5 0.000 0.000 0.000 0.000 <frozen importlib._bootstrap>:147(__enter__)
   … … … …

从上面的输出中可以清楚地看出,cProfile打印出所有被调用的3577个函数,包括每个函数所花费的时间以及它们被调用的次数。 以下是我们输出的列 -

  • ncalls - 这是调用的次数。

  • tottime - 这是在给定函数中花费的总时间。

  • percall - 它指的是tottime除以ncalls的商。

  • cumtime - 这是在这个和所有子功能中花费的累积时间。 它对于递归函数甚至是准确的。

  • percall - 它是cumtime除以原始调用的商。

  • filename:lineno(function) - 它基本上提供了每个函数的相应数据。

Concurrency in Python - Pool of Threads

假设我们必须为多线程任务创建大量线程。 由于线程太多,因此可能存在许多性能问题,这在计算上是最昂贵的。 一个主要问题可能是吞吐量受限。 我们可以通过创建一个线程池来解决这个问题。 线程池可以被定义为预先实例化和空闲线程的组,其准备好被给予工作。 当我们需要执行大量任务时,创建线程池优先于为每个任务实例化新线程。 线程池可以管理大量线程的并发执行,如下所示 -

  • 如果线程池中的线程完成其执行,则可以重用该线程。

  • 如果线程终止,则将创建另一个线程来替换该线程。

Python Module – Concurrent.futures

Python标准库包括concurrent.futures模块。 该模块是在Python 3.2中添加的,用于为开发人员提供启动异步任务的高级接口。 它是Python的线程和多处理模块之上的抽象层,用于提供使用线程池或进程池运行任务的接口。

在接下来的部分中,我们将了解concurrent.futures模块的不同类。

执行者类

Executorconcurrent.futures Python模块的抽象类。 它不能直接使用,我们需要使用以下具体子类之一 -

  • ThreadPoolExecutor
  • ProcessPoolExecutor

ThreadPoolExecutor - 一个具体的子类

它是Executor类的具体子类之一。 子类使用多线程,我们获得了一个用于提交任务的线程池。 此池将任务分配给可用线程并安排它们运行。

如何创建ThreadPoolExecutor?

concurrent.futures模块及其具体的子类Executor的帮助下,我们可以轻松地创建一个线程池。 为此,我们需要构造一个ThreadPoolExecutor ,其中包含我们在池中想要的线程数。 默认情况下,该数字为5.然后我们可以向线程池提交任务。 当我们submit()任务时,我们会回到Future 。 Future对象有一个名为done()的方法,它告诉未来是否已经解决。 有了这个,就为该特定的未来对象设置了一个值。 任务完成后,线程池执行程序将值设置为future对象。

例子 (Example)

from concurrent.futures import ThreadPoolExecutor
from time import sleep
def task(message):
   sleep(2)
   return message
def main():
   executor = ThreadPoolExecutor(5)
   future = executor.submit(task, ("Completed"))
   print(future.done())
   sleep(2)
   print(future.done())
   print(future.result())
if __name__ == '__main__':
main()

输出 (Output)

False
True
Completed

在上面的例子中, ThreadPoolExecutor已经构造了5个线程。 然后,在给出消息之前等待2秒的任务被提交给线程池执行器。 从输出中可以看出,任务直到2秒才完成,因此第一次调用done()将返回False。 2秒后,任务完成,我们通过调用result()方法得到未来的result()

实例化ThreadPoolExecutor - 上下文管理器

实例化ThreadPoolExecutor另一种方法是在上下文管理器的帮助下。 它的工作方式与上例中使用的方法类似。 使用上下文管理器的主要优点是它在语法上看起来很好。 实例化可以在以下代码的帮助下完成 -

with ThreadPoolExecutor(max_workers = 5) as executor

例子 (Example)

以下示例是从Python文档中借用的。 在此示例中,首先必须导入concurrent.futures模块。 然后创建一个名为load_url()的函数,它将加载请求的url。 然后,该函数使用ThreadPoolExecutor的5个线程创建ThreadPoolExecutorThreadPoolExecutor已被用作上下文管理器。 我们可以通过调用result()方法获得未来的result()

import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
   'http://www.cnn.com/',
   'http://europe.wsj.com/',
   'http://www.bbc.co.uk/',
   'http://some-made-up-domain.com/']
def load_url(url, timeout):
   with urllib.request.urlopen(url, timeout = timeout) as conn:
   return conn.read()
with concurrent.futures.ThreadPoolExecutor(max_workers = 5) as executor:
   future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
   for future in concurrent.futures.as_completed(future_to_url):
   url = future_to_url[future]
   try:
      data = future.result()
   except Exception as exc:
      print('%r generated an exception: %s' % (url, exc))
   else:
      print('%r page is %d bytes' % (url, len(data)))

输出 (Output)

以下是上述Python脚本的输出 -

'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed>
'http://www.foxnews.com/' page is 229313 bytes
'http://www.cnn.com/' page is 168933 bytes
'http://www.bbc.co.uk/' page is 283893 bytes
'http://europe.wsj.com/' page is 938109 bytes

Use of Executor.map() function

Python map()函数广泛用于许多任务中。 一个这样的任务是将特定函数应用于迭代中的每个元素。 类似地,我们可以将迭代器的所有元素映射到一个函数,并将它们作为独立的作业提交给ThreadPoolExecutor 。 请考虑以下Python脚本示例,以了解该函数的工作原理。

例子 (Example)

在下面的示例中,map函数用于将square()函数应用于values数组中的每个值。

from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
values = [2,3,4,5]
def square(n):
   return n * n
def main():
   with ThreadPoolExecutor(max_workers = 3) as executor:
      results = executor.map(square, values)
for result in results:
      print(result)
if __name__ == '__main__':
   main()

输出 (Output)

上面的Python脚本生成以下输出 -

4
9
16
25

Concurrency in Python - Pool of Processes

可以使用与创建和使用线程池相同的方式创建和使用进程池。 进程池可以定义为预先实例化和空闲进程的组,它们随时可以进行工作。 当我们需要执行大量任务时,创建进程池优先于为每个任务实例化新进程。

Python Module – Concurrent.futures

Python标准库有一个名为concurrent.futures的模块。 该模块是在Python 3.2中添加的,用于为开发人员提供启动异步任务的高级接口。 它是Python的线程和多处理模块之上的抽象层,用于提供使用线程池或进程池运行任务的接口。

在接下来的部分中,我们将查看concurrent.futures模块的不同子类。

执行者类

Executorconcurrent.futures Python模块的抽象类。 它不能直接使用,我们需要使用以下具体子类之一 -

  • ThreadPoolExecutor
  • ProcessPoolExecutor

ProcessPoolExecutor - 一个具体的子类

它是Executor类的具体子类之一。 它使用多处理,我们获得了一组用于提交任务的流程。 此池将任务分配给可用进程并安排它们运行。

如何创建ProcessPoolExecutor?

concurrent.futures模块及其具体子类Executor的帮助下,我们可以轻松创建一个进程池。 为此,我们需要构建一个ProcessPoolExecutor ,其中包含我们在池中所需的进程数。 默认情况下,该数字为5.然后,将任务提交到流程池。

例子 (Example)

我们现在将考虑在创建线程池时使用的相同示例,唯一的区别是现在我们将使用ProcessPoolExecutor而不是ThreadPoolExecutor

from concurrent.futures import ProcessPoolExecutor
from time import sleep
def task(message):
   sleep(2)
   return message
def main():
   executor = ProcessPoolExecutor(5)
   future = executor.submit(task, ("Completed"))
   print(future.done())
   sleep(2)
   print(future.done())
   print(future.result())
if __name__ == '__main__':
main()

输出 (Output)

False
False
Completed

在上面的示例中,Process PoolExecutor已构造为5个线程。 然后,在给出消息之前等待2秒的任务被提交给进程池执行器。 从输出中可以看出,任务直到2秒才完成,因此第一次调用done()将返回False。 2秒后,任务完成,我们通过调用result()方法得到未来的result()

实例化ProcessPoolExecutor - 上下文管理器

实例化ProcessPoolExecutor的另一种方法是在上下文管理器的帮助下。 它的工作方式与上例中使用的方法类似。 使用上下文管理器的主要优点是它在语法上看起来很好。 实例化可以在以下代码的帮助下完成 -

with ProcessPoolExecutor(max_workers = 5) as executor

例子 (Example)

为了更好地理解,我们采用与创建线程池时使用的相同的示例。 在此示例中,我们需要先导入concurrent.futures模块。 然后创建一个名为load_url()的函数,它将加载请求的url。 然后使用池中的5个线程创建ProcessPoolExecutor 。 Process PoolExecutor已被用作上下文管理器。 我们可以通过调用result()方法获得未来的result()

import concurrent.futures
from concurrent.futures import ProcessPoolExecutor
import urllib.request
URLS = ['http://www.foxnews.com/',
   'http://www.cnn.com/',
   'http://europe.wsj.com/',
   'http://www.bbc.co.uk/',
   'http://some-made-up-domain.com/']
def load_url(url, timeout):
   with urllib.request.urlopen(url, timeout = timeout) as conn:
      return conn.read()
def main():
   with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
      future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
      for future in concurrent.futures.as_completed(future_to_url):
      url = future_to_url[future]
      try:
         data = future.result()
      except Exception as exc:
         print('%r generated an exception: %s' % (url, exc))
      else:
         print('%r page is %d bytes' % (url, len(data)))
if __name__ == '__main__':
   main()

输出 (Output)

上面的Python脚本将生成以下输出 -

'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed>
'http://www.foxnews.com/' page is 229476 bytes
'http://www.cnn.com/' page is 165323 bytes
'http://www.bbc.co.uk/' page is 284981 bytes
'http://europe.wsj.com/' page is 967575 bytes

使用Executor.map()函数

Python map()函数广泛用于执行许多任务。 一个这样的任务是将特定函数应用于迭代中的每个元素。 类似地,我们可以将迭代器的所有元素映射到函数,并将它们作为独立作业提交给ProcessPoolExecutor 。 请考虑以下Python脚本示例来理解这一点。

例子 (Example)

我们将考虑使用Executor.map()函数创建线程池时使用的相同示例。 在下面给出的示例中,map函数用于将square()函数应用于values数组中的每个值。

from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
values = [2,3,4,5]
def square(n):
   return n * n
def main():
   with ProcessPoolExecutor(max_workers = 3) as executor:
      results = executor.map(square, values)
   for result in results:
      print(result)
if __name__ == '__main__':
   main()

输出 (Output)

上面的Python脚本将生成以下输出

4
9
16
25

何时使用ProcessPoolExecutor和ThreadPoolExecutor?

既然我们已经研究了Executor类 - ThreadPoolExecutor和ProcessPoolExecutor,我们需要知道何时使用哪个执行器。 我们需要在遇到CPU限制的工作负载时选择ProcessPoolExecutor,在I/O绑定工作负载的情况下选择ThreadPoolExecutor。

如果我们使用ProcessPoolExecutor ,那么我们不需要担心GIL,因为它使用多处理。 而且,与ThreadPoolExecution相比,执行时间会更短。 请考虑以下Python脚本示例来理解这一点。

例子 (Example)

import time
import concurrent.futures
value = [8000000, 7000000]
def counting(n):
   start = time.time()
   while n > 0:
      n -= 1
   return time.time() - start
def main():
   start = time.time()
   with concurrent.futures.ProcessPoolExecutor() as executor:
      for number, time_taken in zip(value, executor.map(counting, value)):
         print('Start: {} Time taken: {}'.format(number, time_taken))
   print('Total time taken: {}'.format(time.time() - start))
if __name__ == '__main__':
main()

输出 (Output)

Start: 8000000 Time taken: 1.5509998798370361
Start: 7000000 Time taken: 1.3259999752044678
Total time taken: 2.0840001106262207
Example- Python script with ThreadPoolExecutor:
import time
import concurrent.futures
value = [8000000, 7000000]
def counting(n):
   start = time.time()
   while n > 0:
      n -= 1
   return time.time() - start
def main():
   start = time.time()
   with concurrent.futures.ThreadPoolExecutor() as executor:
      for number, time_taken in zip(value, executor.map(counting, value)):
         print('Start: {} Time taken: {}'.format(number, time_taken))
      print('Total time taken: {}'.format(time.time() - start))
if __name__ == '__main__':
main()

输出 (Output)

Start: 8000000 Time taken: 3.8420000076293945
Start: 7000000 Time taken: 3.6010000705718994
Total time taken: 3.8480000495910645

从上述两个程序的输出中,我们可以看到使用ProcessPoolExecutorThreadPoolExecutor执行时间的差异。

Concurrency in Python - Multiprocessing

在本章中,我们将更多地关注多处理和多线程之间的比较。

多处理器 Multiprocessing

它是在单个计算机系统中使用两个或更多CPU单元。 通过利用我们计算机系统中可用的全部CPU内核,这是从硬件中充分发挥潜力的最佳方法。

多线程 Multithreading

通过并发执行多个线程,CPU能够管理操作系统的使用。 多线程的主要思想是通过将进程划分为多个线程来实现并行性。

下表显示了它们之间的一些重要差异 -

Sr.No. 多道程序
1 多处理是指多个CPU同时处理多个进程。 多道程序同时在主存储器中保存多个程序,并使用单个CPU同时执行它们。
2It utilizes multiple CPUs.It utilizes single CPU.
3It permits parallel processing.Context switching takes place.
4 减少处理工作所需的时间。 处理工作所需的时间更多。
5 它有助于高效利用计算机系统的设备。 Less efficient than multiprocessing.
6Usually more expensive. 这种系统较便宜。

消除全球翻译锁(GIL)的影响

在使用并发应用程序时,Python中存在一个名为GIL (Global Interpreter Lock) 。 GIL从不允许我们使用多个CPU内核,因此我们可以说Python中没有真正的线程。 GIL是互斥锁 - 互斥锁,它使线程安全。 换句话说,我们可以说GIL阻止多个线程并行执行Python代码。 锁一次只能由一个线程保存,如果我们想要执行一个线程,那么它必须首先获取锁。

通过使用多处理,我们可以有效地绕过GIL引起的限制 -

  • 通过使用多处理,我们正在利用多个流程的能力,因此我们正在利用GIL的多个实例。

  • 因此,在任何时候都没有限制在我们的程序中执行一个线程的字节码。

在Python中启动进程

以下三种方法可用于在多处理模块中以Python启动进程 -

  • Fork
  • Spawn
  • Forkserver

使用Fork创建进程

Fork命令是UNIX中的标准命令。 它用于创建称为子进程的新进程。 此子进程与称为父进程的进程同时运行。 这些子进程也与其父进程相同,并继承了父进程可用的所有资源。 使用Fork创建进程时使用以下系统调用 -

  • fork() - 这是一个通常在内核中实现的系统调用。 它用于创建process.p的副本“

  • getpid() - 此系统调用返回调用进程的进程ID(PID)。

例子 (Example)

以下Python脚本示例将帮助您了解如何创建新的子进程并获取子进程和父进程的PID -

import os
def child():
   n = os.fork()
   if n > 0:
      print("PID of Parent process is : ", os.getpid())
   else:
      print("PID of Child process is : ", os.getpid())
child()

输出 (Output)

PID of Parent process is : 25989
PID of Child process is : 25990

使用Spawn创建进程

Spawn意味着开始新事物。 因此,产生进程意味着父进程创建新进程。 父进程以异步方式继续执行,或等待子进程结束执行。 按照以下步骤生成流程 -

  • 导入多处理模块。

  • 创建对象流程。

  • 通过调用start()方法启动进程活动。

  • 等待进程完成其工作并通过调用join()方法退出。

例子 (Example)

以下Python脚本示例有助于生成三个进程

import multiprocessing
def spawn_process(i):
   print ('This is process: %s' %i)
   return
if __name__ == '__main__':
   Process_jobs = []
   for i in range(3):
   p = multiprocessing.Process(target = spawn_process, args = (i,))
      Process_jobs.append(p)
   p.start()
   p.join()

输出 (Output)

This is process: 0
This is process: 1
This is process: 2

使用Forkserver创建进程

Forkserver机制仅适用于那些支持通过Unix管道传递文件描述符的所选UNIX平台。 考虑以下几点来理解Forkserver机制的工作原理 -

  • 使用Forkserver机制实例化服务器以启动新进程。

  • 然后,服务器接收命令并处理创建新进程的所有请求。

  • 为了创建一个新进程,我们的python程序将向Forkserver发送一个请求,它将为我们创建一个进程。

  • 最后,我们可以在我们的程序中使用这个新创建的过程。

守护程序在Python中处理

Python multiprocessing模块允许我们通过其守护进程选项获得守护进程。 守护程序进程或在后台运行的进程遵循与守护程序线程类似的概念。 要在后台执行该过程,我们需要将守护程序标志设置为true。 只要主进程正在执行,守护程序进程将继续运行,并且它将在完成执行后或主程序被终止时终止。

例子 (Example)

在这里,我们使用与守护程序线程中使用的相同的示例。 唯一的区别是模块从multithreading更改为multithreading multiprocessing并将守护程序标志设置为true。 但是,输出会有变化,如下所示 -

import multiprocessing
import time
def nondaemonProcess():
   print("starting my Process")
   time.sleep(8)
   print("ending my Process")
def daemonProcess():
   while True:
   print("Hello")
   time.sleep(2)
if __name__ == '__main__':
   nondaemonProcess = multiprocessing.Process(target = nondaemonProcess)
   daemonProcess = multiprocessing.Process(target = daemonProcess)
   daemonProcess.daemon = True
   nondaemonProcess.daemon = False
   daemonProcess.start()
   nondaemonProcess.start()

输出 (Output)

starting my Process
ending my Process

与守护程序线程生成的输出相比,输出是不同的,因为没有守护程序模式的进程具有输出。 因此,守护进程在主程序结束后自动结束,以避免运行进程的持久性。

在Python中终止进程

我们可以使用terminate()方法立即终止或终止进程。 我们将在完成执行之前立即使用此方法终止在函数帮助下创建的子进程。

例子 (Example)

import multiprocessing
import time
def Child_process():
   print ('Starting function')
   time.sleep(5)
   print ('Finished function')
P = multiprocessing.Process(target = Child_process)
P.start()
print("My Process has terminated, terminating main thread")
print("Terminating Child Process")
P.terminate()
print("Child Process successfully terminated")

输出 (Output)

My Process has terminated, terminating main thread
Terminating Child Process
Child Process successfully terminated

输出显示程序在执行子进程之前终止,该子进程是在Child_process()函数的帮助下创建的。 这意味着子进程已成功终止。

在Python中识别当前进程

操作系统中的每个进程都具有称为PID的进程标识。 在Python中,我们可以借助以下命令找出当前进程的PID -

import multiprocessing
print(multiprocessing.current_process().pid)

例子 (Example)

下面的Python脚本示例有助于找出主进程的PID以及子进程的PID -

import multiprocessing
import time
def Child_process():
   print("PID of Child Process is: {}".format(multiprocessing.current_process().pid))
print("PID of Main process is: {}".format(multiprocessing.current_process().pid))
P = multiprocessing.Process(target=Child_process)
P.start()
P.join()

输出 (Output)

PID of Main process is: 9401
PID of Child Process is: 9402

在子类中使用进程

我们可以通过对threading.Thread类进行子类化来创建线程。 此外,我们还可以通过对multiprocessing.Process类进行子类化来创建进程。 对于在子类中使用进程,我们需要考虑以下几点 -

  • 我们需要定义Process类的新子类。

  • 我们需要覆盖_init_(self [,args] )类。

  • 我们需要覆盖run(self [,args] )方法来实现什么Process

  • 我们需要通过调用start()方法来启动该过程。

例子 (Example)

import multiprocessing
class MyProcess(multiprocessing.Process):
   def run(self):
   print ('called run method in process: %s' %self.name)
   return
if __name__ == '__main__':
   jobs = []
   for i in range(5):
   P = MyProcess()
   jobs.append(P)
   P.start()
   P.join()

输出 (Output)

called run method in process: MyProcess-1
called run method in process: MyProcess-2
called run method in process: MyProcess-3
called run method in process: MyProcess-4
called run method in process: MyProcess-5

Python多处理模块 - 池类

如果我们在Python应用程序中讨论简单的并行processing任务,那么多处理模块为我们提供了Pool类。 Pool类的以下方法可用于在主程序中启动子进程的数量

apply() method

此方法类似于.submit()方法.ThreadPoolExecutor. 它会阻塞,直到结果准备就绪。

apply_async() method

当我们需要并行执行任务时,我们需要使用apply_async()方法将任务提交到池中。 它是一个异步操作,在执行所有子进程之前不会锁定主线程。

map() method

就像apply()方法一样,它也会阻塞,直到结果准备好。 它等同于内置的map()函数,该函数将可迭代数据拆分为多个块,并作为单独的任务提交给进程池。

map_async() method

它是map()方法的变体,因为apply_async()apply()方法。 它返回一个结果对象。 结果准备就绪后,将对其应用可调用对象。 必须立即完成赎回; 否则,处理结果的线程将被阻止。

例子 (Example)

以下示例将帮助您实现用于执行并行执行的进程池。 通过multiprocessing.Pool方法应用square()函数,已经执行了简单的数字square()计算。 然后pool.map()用于提交5,因为input是0到4之间的整数列表。结果将存储在p_outputs并打印出来。

def square(n):
   result = n*n
   return result
if __name__ == '__main__':
   inputs = list(range(5))
   p = multiprocessing.Pool(processes = 4)
   p_outputs = pool.map(function_square, inputs)
   p.close()
   p.join()
   print ('Pool :', p_outputs)

输出 (Output)

Pool : [0, 1, 4, 9, 16]

Processes Intercommunication

流程互通是指流程之间的数据交换。 有必要在进程之间交换数据以开发并行应用程序。 下图显示了多个子进程之间同步的各种通信机制 -

互通

各种沟通机制

在本节中,我们将了解各种通信机制。 机制如下所述 -

Queues

队列可以与多进程程序一起使用。 multiprocessing模块的Queue类与Queue.Queue类相似。 因此,可以使用相同的API。 Multiprocessing .Queue为我们提供了一个线程和进程安全的FIFO(先进先出)进程之间的通信机制。

例子 (Example)

下面是一个简单的例子,它来自python官方文档的多处理,以理解Queue类的多处理的概念。

from multiprocessing import Process, Queue
import queue
import random
def f(q):
   q.put([42, None, 'hello'])
def main():
   q = Queue()
   p = Process(target = f, args = (q,))
   p.start()
   print (q.get())
if __name__ == '__main__':
   main()

输出 (Output)

[42, None, 'hello']

Pipes

它是一种数据结构,用于在多进程程序中的进程之间进行通信。 Pipe()函数返回一个由管道连接的连接对象,默认情况下是双工(双向)。 它以下列方式工作 -

  • 它返回一对连接对象,表示管道的两端。

  • 每个对象都有两个方法 - send()recv() ,以便在进程之间进行通信。

例子 (Example)

下面是一个简单的例子,它来自python官方文档的多处理,以理解多处理的Pipe()函数的概念。

from multiprocessing import Process, Pipe
def f(conn):
   conn.send([42, None, 'hello'])
   conn.close()
if __name__ == '__main__':
   parent_conn, child_conn = Pipe()
   p = Process(target = f, args = (child_conn,))
   p.start()
   print (parent_conn.recv())
   p.join()

输出 (Output)

[42, None, 'hello']

Manager

Manager是一类多处理模块,它提供了一种在所有用户之间协调共享信息的方法。 管理器对象控制服务器进程,该进程管理共享对象并允许其他进程操作它们。 换句话说,管理者提供了一种创建可在不同流程之间共享的数据的方法。 以下是经理对象的不同属性 -

  • 管理器的主要属性是控制管理共享对象的服务器进程。

  • 另一个重要属性是在任何进程修改它时更新所有共享对象。

例子 (Example)

下面是一个示例,它使用manager对象在服务器进程中创建列表记录,然后在该列表中添加新记录。

import multiprocessing
def print_records(records):
   for record in records:
      print("Name: {0}\nScore: {1}\n".format(record[0], record[1]))
def insert_record(record, records):
   records.append(record)
      print("A New record is added\n")
if __name__ == '__main__':
   with multiprocessing.Manager() as manager:
      records = manager.list([('Computers', 1), ('Histoty', 5), ('Hindi',9)])
      new_record = ('English', 3)
      p1 = multiprocessing.Process(target = insert_record, args = (new_record, records))
      p2 = multiprocessing.Process(target = print_records, args = (records,))
	  p1.start()
      p1.join()
      p2.start()
      p2.join()

输出 (Output)

A New record is added
Name: Computers
Score: 1
Name: Histoty
Score: 5
Name: Hindi
Score: 9
Name: English
Score: 3

管理器中命名空间的概念

Manager Class带有命名空间的概念,这是一种跨多个进程共享多个属性的快捷方法。 命名空间没有任何可以调用的公共方法,但它们具有可写属性。

例子 (Example)

以下Python脚本示例帮助我们利用名称空间在主进程和子进程之间共享数据 -

import multiprocessing
def Mng_NaSp(using_ns):
   using_ns.x +=5
   using_ns.y *= 10
if __name__ == '__main__':
   manager = multiprocessing.Manager()
   using_ns = manager.Namespace()
   using_ns.x = 1
   using_ns.y = 1
   print ('before', using_ns)
   p = multiprocessing.Process(target = Mng_NaSp, args = (using_ns,))
   p.start()
   p.join()
   print ('after', using_ns)

输出 (Output)

before Namespace(x = 1, y = 1)
after Namespace(x = 6, y = 10)

Ctypes-Array和Value

多处理模块提供Array和Value对象,用于将数据存储在共享内存映射中。 Array是从共享内存分配的ctypes数组, Value是从共享内存分配的ctypes对象。

为了使用,从多处理导入Process,Value,Array。

例子 (Example)

以下Python脚本是一个从python文档中获取的示例,它利用Ctypes Array和Value在进程之间共享一些数据。

def f(n, a):
   n.value = 3.1415927
   for i in range(len(a)):
   a[i] = -a[i]
if __name__ == '__main__':
   num = Value('d', 0.0)
   arr = Array('i', range(10))
   p = Process(target = f, args = (num, arr))
   p.start()
   p.join()
   print (num.value)
   print (arr[:])

输出 (Output)

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

Communicating Sequential Processes (CSP)

CSP用于说明系统与具有并发模型的其他系统的交互。 CSP是一个通过消息传递编写并发或程序的框架,因此它对描述并发性很有效。

Python library – PyCSP

为了实现CSP中的核心原语,Python有一个名为PyCSP的库。 它使实现非常简短和可读,因此可以非常容易地理解它。 以下是PyCSP的基本流程网络 -

PyCSP

在上面的PyCSP过程网络中,有两个过程 - 过程1和过程2.这些过程通过两个通道传递消息进行通信 - 通道1和通道2。

安装PyCSP

借助以下命令,我们可以安装Python库PyCSP -

pip install PyCSP

例子 (Example)

以下Python脚本是一个相互并行运行两个进程的简单示例。 它是在PyCSP python libabary的帮助下完成的 -

from pycsp.parallel import *
import time
@process
def P1():
   time.sleep(1)
   print('P1 exiting')
@process
def P2():
   time.sleep(1)
   print('P2 exiting')
def main():
   Parallel(P1(), P2())
   print('Terminating')
if __name__ == '__main__':
   main()

在上面的脚本中,创建了两个函数,即P1P2 ,然后使用@process进行修饰,以将它们转换为进程。

输出 (Output)

P2 exiting
P1 exiting
Terminating

Event-Driven Programming

事件驱动的编程侧重于事件。 最终,程序的流程取决于事件。 到目前为止,我们处理顺序或并行执行模型,但具有事件驱动编程概念的模型称为异步模型。 事件驱动的编程取决于始终侦听新传入事件的事件循环。 事件驱动编程的工作取决于事件。 一旦事件循环,然后事件决定执行什么以及以什么顺序执行。 以下流程图将帮助您了解其工作原理 -

驱动

Python Module – Asyncio

Asyncio模块是在Python 3.4中添加的,它提供了使用协同例程编写单线程并发代码的基础结构。 以下是Asyncio模块使用的不同概念 -

事件循环

事件循环是处理计算代码中的所有事件的功能。 它在整个程序的执行过程中发挥作用,并跟踪事件的传入和执行。 Asyncio模块允许每个进程使用一个事件循环。 以下是Asyncio模块提供的一些管理事件循环的方法 -

  • loop = get_event_loop() - 此方法将为当前上下文提供事件循环。

  • loop.call_later(time_delay,callback,argument) - 此方法安排在给定的time_delay秒之后调用的回调。

  • loop.call_soon(callback,argument) - 此方法安排尽快调用的回调。 在call_soon()返回并且控件返回到事件循环时调用回调。

  • loop.time() - 此方法用于根据事件循环的内部时钟返回当前时间。

  • asyncio.set_event_loop() - 此方法将当前上下文的事件循环设置为循环。

  • asyncio.new_event_loop() - 此方法将创建并返回一个新的事件循环对象。

  • loop.run_forever() - 此方法将一直运行,直到调用stop()方法。

例子 (Example)

以下事件循环示例通过使用get_event_loop()方法帮助打印hello world 。 此示例取自Python官方文档。

import asyncio
def hello_world(loop):
   print('Hello World')
   loop.stop()
loop = asyncio.get_event_loop()
loop.call_soon(hello_world, loop)
loop.run_forever()
loop.close()

输出 (Output)

Hello World

Futures

这与concurrent.futures.Future类兼容,该类表示尚未完成的计算。 asyncio.futures.Future和concurrent.futures.Future之间存在以下差异 -

  • result()和exception()方法不会使用超时参数,并在未来尚未完成时引发异常。

  • 用add_done_callback()注册的回调总是通过事件循环的call_soon()调用。

  • asyncio.futures.Future类与concurrent.futures包中的wait()和as_completed()函数不兼容。

例子 (Example)

以下是一个示例,可帮助您了解如何使用asyncio.futures.future类。

import asyncio
async def Myoperation(future):
   await asyncio.sleep(2)
   future.set_result('Future Completed')
loop = asyncio.get_event_loop()
future = asyncio.Future()
asyncio.ensure_future(Myoperation(future))
try:
   loop.run_until_complete(future)
   print(future.result())
finally:
   loop.close()

输出 (Output)

Future Completed

协同程序(Coroutines)

Asyncio中的协同程序的概念类似于线程模块下的标准Thread对象的概念。 这是子程序概念的概括。 协程可以在执行期间暂停,以便等待外部处理并从外部处理完成时停止的点返回。 以下两种方式帮助我们实现协同程序 -

async def function()

这是在Asyncio模块下实现协同程序的方法。 以下是相同的Python脚本 -

import asyncio
async def Myoperation():
   print("First Coroutine")
loop = asyncio.get_event_loop()
try:
   loop.run_until_complete(Myoperation())
finally:
   loop.close()

输出 (Output)

First Coroutine

@asyncio.coroutine decorator

实现协同程序的另一种方法是利用@ asyncio.coroutine装饰器来生成生成器。 以下是相同的Python脚本 -

import asyncio
@asyncio.coroutine
def Myoperation():
   print("First Coroutine")
loop = asyncio.get_event_loop()
try:
   loop.run_until_complete(Myoperation())
finally:
   loop.close()

输出 (Output)

First Coroutine

Tasks

Asyncio模块的这个子类负责以并行方式在事件循环中执行协同程序。 以下Python脚本是并行处理某些任务的示例。

import asyncio
import time
async def Task_ex(n):
   time.sleep(1)
   print("Processing {}".format(n))
async def Generator_task():
   for i in range(10):
      asyncio.ensure_future(Task_ex(i))
   int("Tasks Completed")
   asyncio.sleep(2)
loop = asyncio.get_event_loop()
loop.run_until_complete(Generator_task())
loop.close()

输出 (Output)

Tasks Completed
Processing 0
Processing 1
Processing 2
Processing 3
Processing 4
Processing 5
Processing 6
Processing 7
Processing 8
Processing 9

运输(Transports)

Asyncio模块提供用于实现各种类型通信的传输类。 这些类不是线程安全的,并且在建立通信信道之后总是与协议实例配对。

以下是从BaseTransport继承的不同类型的传输 -

  • ReadTransport - 这是只读传输的接口。

  • WriteTransport - 这是只写传输的接口。

  • DatagramTransport - 这是用于发送数据的接口。

  • BaseSubprocessTransport - 类似于BaseSubprocessTransport类。

以下是BaseTransport类的五种不同方法,这些方法随后在四种传输类型中是瞬态的 -

  • close() - 关闭运输。

  • is_closing() - 如果传输正在关闭或已经关闭,则此方法将返回true。传输。

  • get_extra_info(name, default = none) - 这将为我们提供有关传输的一些额外信息。

  • get_protocol() - 此方法将返回当前协议。

协议Protocols

Asyncio模块提供了可以子类化以实现网络协议的基类。 这些课程与运输一起使用; 协议解析传入的数据并请求传出数据的写入,而传输负责实际的I/O和缓冲。 以下是三类议定书 -

  • Protocol - 这是实现用于TCP和SSL传输的流协议的基类。

  • DatagramProtocol - 这是用于实现与UDP传输一起使用的数据报协议的基类。

  • SubprocessProtocol - 这是用于实现通过一组单向管道与子进程通信的协议的基类。

Reactive Programming

反应式编程是一种编程范式,用于处理数据流和变化的传播。 这意味着当一个组件发出数据流时,更改将通过响应式编程库传播到其他组件。 变化的传播将持续到最终接收器。 事件驱动和反应式编程之间的区别在于事件驱动的编程围绕事件而反应式编程围绕数据。

ReactiveX或RX用于反应式编程

ReactiveX或Raective Extension是最着名的反应式编程实现。 ReactiveX的工作取决于以下两个类 -

可观察的类

此类是数据流或事件的来源,它打包传入的数据,以便数据可以从一个线程传递到另一个线程。 在某些观察者订阅数据之前,它不会提供数据。

观察者班

此类使用observable发出的数据流。 可以有多个具有可观察性的观察者,每个观察者将接收发射的每个数据项。 观察者可以通过订阅观察者来接收三种类型的事件 -

  • on_next() event - 它意味着数据流中有一个元素。

  • on_completed() event - 它意味着排放结束,不再有物品到来。

  • on_error() event - 它还意味着发射结束,但是在observable抛出错误的情况下。

RxPY - 用于反应式编程的Python模块

RxPY是一个Python模块,可用于反应式编程。 我们需要确保安装该模块。 以下命令可用于安装RxPY模块 -

pip install RxPY

例子 (Example)

以下是一个Python脚本,它使用RxPY模块及其ObservableObserve forObserve for反应式编程。 基本上有两个类 -

  • get_strings() - 用于从观察者获取字符串。

  • PrintObserver() - 用于从观察者打印字符串。 它使用观察者类的所有三个事件。 它还使用了subscribe()类。

from rx import Observable, Observer
def get_strings(observer):
   observer.on_next("Ram")
   observer.on_next("Mohan")
   observer.on_next("Shyam")
      observer.on_completed()
class PrintObserver(Observer):
   def on_next(self, value):
      print("Received {0}".format(value))
   def on_completed(self):
   print("Finished")
   def on_error(self, error):
      print("Error: {0}".format(error))
source = Observable.create(get_strings)
source.subscribe(PrintObserver())

输出 (Output)

Received Ram
Received Mohan
Received Shyam
Finished

用于反应式编程的PyFunctional库

PyFunctional是另一个可用于反应式编程的Python库。 它使我们能够使用Python编程语言创建功能程序。 它很有用,因为它允许我们使用链式函数运算符创建数据管道。

RxPY和PyFunctional之间的差异

这两个库都用于反应式编程并以类似的方式处理流,但它们之间的主要区别取决于数据的处理。 RxPY处理系统中的数据和事件,而PyFunctional则专注于使用函数式编程范例转换数据。

安装PyFunctional模块

我们需要在使用之前安装此模块。 它可以在pip命令的帮助下安装如下 -

pip install pyfunctional

例子 (Example)

下面的示例使用the PyFunctional模块及其seq类,它们充当我们可以迭代和操作的流对象。 在这个程序中,它使用将每个值加倍的lamda函数映射序列,然后过滤x大于4的值,最后将序列缩减为所有剩余值的总和。

from functional import seq
result = seq(1,2,3).map(lambda x: x*2).filter(lambda x: x > 4).reduce(lambda x, y: x + y)
print ("Result: {}".format(result))

输出 (Output)

Result: 6
↑回到顶部↑
WIKI教程 @2018