Python 多进程处理数据
文章目录
- 1. multiprocessing.Process
- 2. multiprocessing.Pool.starmap
- 3. multiprocessing.Pool.starmap_async
1. multiprocessing.Process
# _*_ coding: utf-8 _*_
# @Time : 2022/10/16 15:45
# @Author : Michael
# @File : multiprocess_demo.py
# @desc :
import multiprocessing
import os
import time
class compute_process(multiprocessing.Process): # 计算处理进程
def __init__(self, input_datafile, output_datafile):
super(compute_process, self).__init__()
self.input_datafile = input_datafile
self.output_datafile = output_datafile
def compute(self):
with open(self.output_datafile, 'w') as fout:
with open(self.input_datafile, 'r') as fin:
for line in fin:
fout.write(f'{line.strip()} out\n')
def run(self):
self.compute()
print(f'finish compute process with {self.input_datafile}')
if __name__ == '__main__':
num_workers = 4
print('start compute')
input_datafiles = ['./test/1.txt', './test/2.txt', './test/3.txt', './test/4.txt']
output_datafiles = ['./test/out1.txt', './test/out2.txt', './test/out3.txt', './test/out4.txt']
processes = [compute_process(i, o) for i, o in zip(input_datafiles, output_datafiles)]
[p.start() for p in processes]
# [p.join() for p in processes] # 等待子进程结束在执行主进程
for o in output_datafiles:
if os.path.exists(o):
print(f'exists {o}')
else:
print(f'not exists {o}')
print(f'congratulations finish')
如果 没有 join
,主进程直接会执行后续代码
输出:
start compute
not exists ./test/out1.txt
not exists ./test/out2.txt
not exists ./test/out3.txt
not exists ./test/out4.txt
congratulations finish
finish compute process with ./test/1.txt
finish compute process with ./test/2.txt
finish compute process with ./test/3.txt
finish compute process with ./test/4.txt
如果 打开 join
就会等待子进程结束才会继续执行
start compute
finish compute process with ./test/2.txt
finish compute process with ./test/3.txt
finish compute process with ./test/1.txt
finish compute process with ./test/4.txt
exists ./test/out1.txt
exists ./test/out2.txt
exists ./test/out3.txt
exists ./test/out4.txt
congratulations finish
多进程也会相应消耗更多倍的资源,可以根据资源情况,设置进程数量来限制
# _*_ coding: utf-8 _*_
# @Time : 2022/10/16 15:45
# @Author : Michael
# @File : multiprocess_demo.py
# @desc :
import multiprocessing
import os
import time
import math
class compute_process(multiprocessing.Process):
def __init__(self, input_datafile, output_datafile):
super(compute_process, self).__init__()
self.input_datafile = input_datafile
self.output_datafile = output_datafile
def compute(self):
with open(self.output_datafile, 'w') as fout:
with open(self.input_datafile, 'r') as fin:
for line in fin:
fout.write(f'{line.strip()} out\n')
def run(self):
self.compute()
print(f'finish compute process with {self.input_datafile}')
if __name__ == '__main__':
num_workers = 3
print('start compute')
all_input_datafiles = ['./test/1.txt', './test/2.txt', './test/3.txt', './test/4.txt']
all_output_datafiles = ['./test/out1.txt', './test/out2.txt', './test/out3.txt', './test/out4.txt']
for idx in range(math.ceil(len(all_input_datafiles) / num_workers)):
input_datafiles = all_input_datafiles[idx * num_workers: (idx + 1) * num_workers]
output_datafiles = all_output_datafiles[idx * num_workers: (idx + 1) * num_workers]
processes = [compute_process(i, o) for i, o in zip(input_datafiles, output_datafiles)]
print(f'idx: {idx}, len of sub processes: {len(processes)}')
[p.start() for p in processes]
[p.join() for p in processes] # 等待子进程结束在执行主进程
for o in all_output_datafiles:
if os.path.exists(o):
print(f'exists {o}')
else:
print(f'not exists {o}')
print(f'congratulations finish')
输出:
start compute
idx: 0, len of sub processes: 3
finish compute process with ./test/1.txt
finish compute process with ./test/2.txt
finish compute process with ./test/3.txt
idx: 1, len of sub processes: 1
finish compute process with ./test/4.txt
exists ./test/out1.txt
exists ./test/out2.txt
exists ./test/out3.txt
exists ./test/out4.txt
congratulations finish
但是上面 for 循环有个问题,一次循环中需要等 耗时最长的子进程 结束才能开始下一个循环
2. multiprocessing.Pool.starmap
我把 1.txt 文件换成一个上百万行的文件
# _*_ coding: utf-8 _*_
# @Time : 2022/10/16 18:13
# @Author : Michael
# @File : multiprocess_pool_demo.py
# @desc :
import multiprocessing
import os
def compute_func(input_datafile, output_datafile, sql):
with open(output_datafile, 'w') as fout:
with open(input_datafile, 'r') as fin:
for line in fin:
fout.write(f'{line.strip()} out\n')
print(f'finish compute process with {input_datafile} and sql {sql}')
return output_datafile
if __name__ == '__main__':
num_workers = 3
print('start compute')
all_input_datafiles = ['./test/1.txt', './test/2.txt', './test/3.txt', './test/4.txt']
all_output_datafiles = ['./test/out1.txt', './test/out2.txt', './test/out3.txt', './test/out4.txt']
sql = ['sql1', 'sql2', 'sql3', 'sql4']
pool = multiprocessing.Pool(processes=num_workers)
outputs = pool.starmap(compute_func, [(i, o, s) for i, o, s in zip(all_input_datafiles, all_output_datafiles, sql)])
# 第二个参数 是 第一个参数 函数的参数
# map版本是阻塞的
print('outputs', outputs)
print('pool finish')
# pool.close()
for o in all_output_datafiles:
if os.path.exists(o):
print(f'exists {o}')
else:
print(f'not exists {o}')
print(f'congratulations finish')
输出
start compute
finish compute process with ./test/2.txt and sql sql2
finish compute process with ./test/3.txt and sql sql3
finish compute process with ./test/4.txt and sql sql4
finish compute process with ./test/1.txt and sql sql1
outputs ['./test/out1.txt', './test/out2.txt', './test/out3.txt', './test/out4.txt']
pool finish
exists ./test/out1.txt
exists ./test/out2.txt
exists ./test/out3.txt
exists ./test/out4.txt
congratulations finish
可以看出 1.txt 文件是最后执行完
3. multiprocessing.Pool.starmap_async
# _*_ coding: utf-8 _*_
# @Time : 2022/10/16 18:13
# @Author : Michael
# @File : multiprocess_pool_demo.py
# @desc :
import multiprocessing
import os
def compute_func(input_datafile, output_datafile, sql):
with open(output_datafile, 'w') as fout:
with open(input_datafile, 'r') as fin:
for line in fin:
fout.write(f'{line.strip()} out\n')
print(f'finish compute process with {input_datafile} and sql {sql}')
return output_datafile
if __name__ == '__main__':
num_workers = 3
print('start compute')
all_input_datafiles = ['./test/1.txt', './test/2.txt', './test/3.txt', './test/4.txt']
all_output_datafiles = ['./test/out1.txt', './test/out2.txt', './test/out3.txt', './test/out4.txt']
sql = ['sql1', 'sql2', 'sql3', 'sql4']
pool = multiprocessing.Pool(processes=num_workers)
outputs_async = pool.starmap_async(compute_func, [(i, o, s) for i, o, s in zip(all_input_datafiles, all_output_datafiles, sql)])
# starmap_async 异步
print('outputs_async', outputs_async)
outputs_async = [o for o in outputs_async.get()] # get 会等待所有子进程完成
print('outputs_async', outputs_async)
print('pool finish')
# pool.close()
# pool.join() # 异步的话,如果没有 get 结果,这里需要join,不然会直接执行后续代码
for o in all_output_datafiles:
if os.path.exists(o):
print(f'exists {o}')
else:
print(f'not exists {o}')
print(f'congratulations finish')
输出:
start compute
outputs_async <multiprocessing.pool.MapResult object at 0x000002D15FC86BE0>
finish compute process with ./test/2.txt and sql sql2
finish compute process with ./test/3.txt and sql sql3
finish compute process with ./test/4.txt and sql sql4
finish compute process with ./test/1.txt and sql sql1
outputs_async ['./test/out1.txt', './test/out2.txt', './test/out3.txt', './test/out4.txt']
pool finish
exists ./test/out1.txt
exists ./test/out2.txt
exists ./test/out3.txt
exists ./test/out4.txt
congratulations finish
如果把 get 结果行删掉,且下面没有 join 函数等待,最后没有输出文件生成