Python 3 concurrent.futures 多进程 并发写文件

Python 3 的 concurrent.futures 里面封装了多进程 ProcessPoolExecutor 和多线程 ThreadPoolExecutor,代码写起来非常简单,一个简单的示例:

1
2
3
4
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(pow, 323, 1235)
print(future.result())

executor 有两个常用的方法 submitmap,前者用于单次提交任务,后者用于批量提交。

下面是多进行写文件的方法,参考了 stackoverflow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#!/usr/bin/env python
# encoding: utf-8

import multiprocessing
from concurrent.futures import ProcessPoolExecutor

import time

def do(file_name, i, lock):
with lock:
with open(file_name, 'a+') as f:
f.writelines(["i=" + str(i) + ":" + str(x) + "\n" for x in range(3)])

def main():
pool = ProcessPoolExecutor(max_workers=4)
m = multiprocessing.Manager()
lock = m.Lock()
futures = [pool.submit(do, "./test.txt", num, lock) for num in range(3)]
for future in futures:
future.result()


if __name__ == '__main__':
main()

输出结果:

1
2
3
4
5
6
7
8
9
i=0:0
i=0:1
i=0:2
i=2:0
i=2:1
i=2:2
i=1:0
i=1:1
i=1:2

为什么这样写呢,因为最开始我是在主函数里面收集 do 的返回值,统一把结果写到文件里面,后来遇到特别大的文件把内存搞爆了,机器死机了;

后来想着把一个 open('./test.txt', 'a+') as f 作为参数传给 do 方法,结果就是报错:

TypeError: cannot serialize '_io.TextIOWrapper' object file 对象不能序列化。

所以要用上面的方法,传递文件名给各个进程,每个进程单独去写文件,同时在写文件之前加锁。