多线程编程

threading模块

threading 模块的对象
对 象 描 述
Thread 表示一个执行线程的对象
Lock 锁原语对象(和 thread 模块中的锁一样)
RLock 可重入锁对象,使单一线程可以(再次)获得已持有的锁(递归锁)
Condition 条件变量对象,使得一个线程等待另一个线程满足特定的“条件”,比如改变状态或 某个数据值
Event 条件变量的通用版本,任意数量的线程等待某个事件的发生,在该事件发生后所有 线程将被激活
Semaphore 为线程间共享的有限资源提供了一个“计数器”,如果没有可用资源时会被阻塞
BoundedSemaphore 与 Semaphore 相似,不过它不允许超过初始值
Timer 与 Thread 相似,不过它要在运行前等待一段时间
Barrier① 创建一个“障碍”,必须达到指定数量的线程后才可以继续
守护线程
thread模块,不支持守护线程这个概念。
当主线程退出时,所有子线程都将终止,不管它们是否仍在工作
如果你不希望发生这种行为,就要引入守护线程的概念了
threading 模块支持守护线程,其工作方式是:守护线程一般是一个等待客户端请求服务的服务器。
如果没有客户端请求,守护线程就是空闲的。
如果把一个线程设置为守护线程,就表示这个线程是不重要的,进程退出时不需要等待这个线程执行完成。
要将一个线程设置为守护线程,需要在启动线程之前执行如下赋值语句:thread.daemon = True
同样,要检查线程的守护状态,也只需要检查这个值即可,一个新的子线程会继承父线程的守护标记。
整个 Python 程序(主线程)将在所有非守护线程退出之后才退出
换句话说,就是没有剩下存活的非守护线程时。

Thread 类

threading 模块的 Thread 类是主要的执行对象。它有 thread 模块中没有的很多函数。

Thread

对象的属性和方法

属 性 描 述
Thread 对象数据属性
name 线程名
ident 线程的标识符
daemon 布尔标志,表示这个线程是否是守护线程
Thread 对象方法
init(group=None, tatget=None, name=None, args=(), kwargs ={}, verbose=None, daemon=None) ③ 实例化一个线程对象,需要有一个可调用的 target,以及其参数 args 或 kwargs。还可以传递 name 或 group 参数,不过后者还未实现。此 外 , verbose 标 志 也 是 可 接 受 的。 而 daemon 的 值 将 会 设定 thread.daemon 属性/标志
属 性 描 述
start() 开始执行该线程
run() 定义线程功能的方法(通常在子类中被应用开发者重写)
join (timeout=None) 直至启动的线程终止之前一直挂起;除非给出了 timeout(秒),否则 会一直阻塞
getName()① 返回线程名
setName (name)① 设定线程名
isAlivel /is_alive ()② 布尔标志,表示这个线程是否还存活
isDaemon()③ 如果是守护线程,则返回 True;否则,返回 False
setDaemon(daemonic)③ 把线程的守护标志设定为布尔值 daemonic(必须在线程 start()之前 调用)

创建实例

启动一个线程就是把一个函数传入并创建Thread实例,然后调用start()开始执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#!/usr/bin/env python
# -*- coding:utf8 -*-

import threading
import time

def loop():
strat_time = time.time()
print('thread %s is running...' % threading.current_thread().name) #当前进程名
n = 0
while n < 5:
n = n + 1
print('thread %s >>> %s' % (threading.current_thread().name, n))
time.sleep(1)
print('thread %s ended.' % threading.current_thread().name)
end_time = time.time()
all_time = end_time - strat_time
print("共用时:%s" % all_time)


print('thread %s is running...time = %s' % (threading.current_thread().name,time.ctime()))
t = threading.Thread(target=loop, name='LoopThread') # 创建Thread实例
t.start() # 开始执行
t.join() # 阻塞主线程
print('thread %s ended. time = %s ' % (threading.current_thread().name,time.ctime()))

Lock

多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响,

而多线程中,所有变量都由所有线程共享,所以,任何一个变量都可以被任何一个线程修改,

因此,线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了。

一个用Lock解决的示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import time, threading

# 假定这是你的银行存款:
balance = 0
lock=threading.Lock() #创建锁

def change_it(n):
# 先存后取,结果应该为0:
global balance
balance = balance + n
balance = balance - n

def run_thread(n):
for i in range(100000):
lock.acquire() # 申请锁
try:
change_it(n)
finally:
lock.release() # 释放锁

t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))

t1.start()
t2.start()

t1.join()
t2.join()

print("The Res is %d" % balance)

生产者-消费者问题和queue模块

queue模块,提供线程间通信的机制,从而让线程之间可以互相分享数据

具体而言,就是创建一个队列,让生产者(线程)在其中放入新的商品,而消费者(线程)消费这些商品

queue

模块常用属性

属 性 描 述
queue 模块的类
Queue(maxsize=0) 创建一个先入先出队列。如果给定最大值,则在队列没有空间时阻塞;否则(没 有指定最大值),为无限队列
LifoQueue(maxsize=0) 创建一个后入先出队列。如果给定最大值,则在队列没有空间时阻塞;否则(没 有指定最大值),为无限队列
PriorityQueue(maxsize=0) 创建一个优先级队列。如果给定最大值,则在队列没有空间时阻塞,否则(没 有指定最大值) ,为无限队列
Queue/queue 异常
Empty 当对空队列调用 get*()方法时抛出异常
Full 当对已满的队列调用 put*()方法时抛出异常
Queue/queue 对象方法
qsize () 返回队列大小(由于返回时队列大小可能被其他线程修改,所以该值为近似值)
empty() 如果队列为空,则返回 True;否则,返回 False
full() 如果队列已满,则返回 True;否则,返回 False
put (item, block=Ture, timeout=None) 将 item 放入队列。如果 block 为 True(默认)且 timeout 为 None,则在有可用 空间之前阻塞;如果 timeout 为正值,则最多阻塞 timeout 秒;如果 block 为 False, 则抛出 Empty 异常
put_nowait(item) 和 put(item, False)相同
get (block=True, timeout=None) 从队列中取得元素。如果给定了 block(非 0),则一直阻塞到有可用的元素 为止
get_nowait() 和 get(False)相同
task_done() 用于表示队列中的某个元素已执行完成,该方法会被下面的 join()使用
join() 在队列中所有元素执行完毕并调用上面的 task_done()信号之前,保持阻塞

MyThread.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#!/usr/bin/env python

import threading
from time import ctime


class MyThread(threading.Thread):
def __init__(self, func, args, name=''):
threading.Thread.__init__(self)
self.func = func
self.name = name
self.args = args

def run(self):
print('开始执行', self.name, ' 在:', ctime())
self.res = self.func(*self.args)
print(self.name, '结束于:', ctime())

def getResult(self):
return self.res

product.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
#!/usr/bin/env python
# -*- coding:utf8 -*-
from random import randint
from time import sleep
from queue import Queue
from MyThread import MyThread


# 将一个对象放入队列中
def writeQ(queue):
print('正在为队列生产………')
queue.put('商品', 1)
print('当前商品总数:', queue.qsize())


# 消费队列中的一个对象
def readQ(queue):
val = queue.get(1)
print('正在从队列中消费商品……消费后还剩余商品:', queue.qsize())


# 模仿生产者。
def writer(queue, loops):
for i in range(loops):
writeQ(queue)
sleep(randint(1, 3)) # writer的睡眠时间一般比reader短,是为了阻碍 reader从空队列中获取对象,换句话说就是使得轮到reader执行时,已存在可消费对象的可能性更大。


# 模仿消费者
def reader(queue, loops):
for i in range(loops):
readQ(queue)
sleep(randint(2, 5))


funcs = [writer, reader]
nfuncs = range(len(funcs))


def main():
nloops = randint(2, 5) # randint 和randrange类似,区别在于,randrange是半开半闭区间,而randint是闭区间
q = Queue(32)

threads = [] # 模拟线程池
for i in nfuncs:
t = MyThread(funcs[i], (q, nloops), funcs[i].__name__) # 创建线程
threads.append(t)

for i in nfuncs:
threads[i].start() # 开始执行线程

for i in nfuncs:
threads[i].join()

print('结束')


if __name__ == '__main__':
main()