当前位置: 首页 > news >正文

Python 多进程处理数据

文章目录

    • 1. multiprocessing.Process
    • 2. multiprocessing.Pool.starmap
    • 3. multiprocessing.Pool.starmap_async

1. multiprocessing.Process

# _*_ coding: utf-8 _*_
# @Time : 2022/10/16 15:45
# @Author : Michael
# @File : multiprocess_demo.py
# @desc :
import multiprocessing
import os
import time


class compute_process(multiprocessing.Process):  # 计算处理进程
    def __init__(self, input_datafile, output_datafile):
        super(compute_process, self).__init__()
        self.input_datafile = input_datafile
        self.output_datafile = output_datafile

    def compute(self):
        with open(self.output_datafile, 'w') as fout:
            with open(self.input_datafile, 'r') as fin:
                for line in fin:
                    fout.write(f'{line.strip()} out\n')

    def run(self):
        self.compute()
        print(f'finish compute process with {self.input_datafile}')


if __name__ == '__main__':
    num_workers = 4
    print('start compute')
    input_datafiles = ['./test/1.txt', './test/2.txt', './test/3.txt', './test/4.txt']
    output_datafiles = ['./test/out1.txt', './test/out2.txt', './test/out3.txt', './test/out4.txt']
    processes = [compute_process(i, o) for i, o in zip(input_datafiles, output_datafiles)]
    [p.start() for p in processes]
    # [p.join() for p in processes]  # 等待子进程结束在执行主进程
    for o in output_datafiles:
        if os.path.exists(o):
            print(f'exists {o}')
        else:
            print(f'not exists {o}')
    print(f'congratulations finish')

如果 没有 join ,主进程直接会执行后续代码
在这里插入图片描述

输出:

start compute
not exists ./test/out1.txt
not exists ./test/out2.txt
not exists ./test/out3.txt
not exists ./test/out4.txt
congratulations finish
finish compute process with ./test/1.txt
finish compute process with ./test/2.txt
finish compute process with ./test/3.txt
finish compute process with ./test/4.txt

如果 打开 join 就会等待子进程结束才会继续执行

start compute
finish compute process with ./test/2.txt
finish compute process with ./test/3.txt
finish compute process with ./test/1.txt
finish compute process with ./test/4.txt
exists ./test/out1.txt
exists ./test/out2.txt
exists ./test/out3.txt
exists ./test/out4.txt
congratulations finish

多进程也会相应消耗更多倍的资源,可以根据资源情况,设置进程数量来限制

# _*_ coding: utf-8 _*_
# @Time : 2022/10/16 15:45
# @Author : Michael
# @File : multiprocess_demo.py
# @desc :
import multiprocessing
import os
import time
import math


class compute_process(multiprocessing.Process):
    def __init__(self, input_datafile, output_datafile):
        super(compute_process, self).__init__()
        self.input_datafile = input_datafile
        self.output_datafile = output_datafile

    def compute(self):
        with open(self.output_datafile, 'w') as fout:
            with open(self.input_datafile, 'r') as fin:
                for line in fin:
                    fout.write(f'{line.strip()} out\n')

    def run(self):
        self.compute()
        print(f'finish compute process with {self.input_datafile}')


if __name__ == '__main__':
    num_workers = 3
    print('start compute')
    all_input_datafiles = ['./test/1.txt', './test/2.txt', './test/3.txt', './test/4.txt']
    all_output_datafiles = ['./test/out1.txt', './test/out2.txt', './test/out3.txt', './test/out4.txt']
    for idx in range(math.ceil(len(all_input_datafiles) / num_workers)):
        input_datafiles = all_input_datafiles[idx * num_workers: (idx + 1) * num_workers]
        output_datafiles = all_output_datafiles[idx * num_workers: (idx + 1) * num_workers]
        processes = [compute_process(i, o) for i, o in zip(input_datafiles, output_datafiles)]
        print(f'idx: {idx}, len of sub processes: {len(processes)}')
        [p.start() for p in processes]
        [p.join() for p in processes]  # 等待子进程结束在执行主进程
    for o in all_output_datafiles:
        if os.path.exists(o):
            print(f'exists {o}')
        else:
            print(f'not exists {o}')
    print(f'congratulations finish')

输出:

start compute
idx: 0, len of sub processes: 3
finish compute process with ./test/1.txt
finish compute process with ./test/2.txt
finish compute process with ./test/3.txt
idx: 1, len of sub processes: 1
finish compute process with ./test/4.txt
exists ./test/out1.txt
exists ./test/out2.txt
exists ./test/out3.txt
exists ./test/out4.txt
congratulations finish

但是上面 for 循环有个问题,一次循环中需要等 耗时最长的子进程 结束才能开始下一个循环

2. multiprocessing.Pool.starmap

我把 1.txt 文件换成一个上百万行的文件

# _*_ coding: utf-8 _*_
# @Time : 2022/10/16 18:13
# @Author : Michael
# @File : multiprocess_pool_demo.py
# @desc :
import multiprocessing
import os

def compute_func(input_datafile, output_datafile, sql):
    with open(output_datafile, 'w') as fout:
        with open(input_datafile, 'r') as fin:
            for line in fin:
                fout.write(f'{line.strip()} out\n')
    print(f'finish compute process with {input_datafile} and sql {sql}')
    return output_datafile


if __name__ == '__main__':
    num_workers = 3
    print('start compute')
    all_input_datafiles = ['./test/1.txt', './test/2.txt', './test/3.txt', './test/4.txt']
    all_output_datafiles = ['./test/out1.txt', './test/out2.txt', './test/out3.txt', './test/out4.txt']
    sql = ['sql1', 'sql2', 'sql3', 'sql4']

    pool = multiprocessing.Pool(processes=num_workers)

    outputs = pool.starmap(compute_func, [(i, o, s) for i, o, s in zip(all_input_datafiles, all_output_datafiles, sql)])
    # 第二个参数 是 第一个参数 函数的参数
    
    # map版本是阻塞的
    print('outputs', outputs)

    print('pool finish')
    # pool.close()

    for o in all_output_datafiles:
        if os.path.exists(o):
            print(f'exists {o}')
        else:
            print(f'not exists {o}')
    print(f'congratulations finish')

输出

start compute
finish compute process with ./test/2.txt and sql sql2
finish compute process with ./test/3.txt and sql sql3
finish compute process with ./test/4.txt and sql sql4
finish compute process with ./test/1.txt and sql sql1
outputs ['./test/out1.txt', './test/out2.txt', './test/out3.txt', './test/out4.txt']
pool finish
exists ./test/out1.txt
exists ./test/out2.txt
exists ./test/out3.txt
exists ./test/out4.txt
congratulations finish

可以看出 1.txt 文件是最后执行完

3. multiprocessing.Pool.starmap_async

# _*_ coding: utf-8 _*_
# @Time : 2022/10/16 18:13
# @Author : Michael
# @File : multiprocess_pool_demo.py
# @desc :
import multiprocessing
import os

def compute_func(input_datafile, output_datafile, sql):
    with open(output_datafile, 'w') as fout:
        with open(input_datafile, 'r') as fin:
            for line in fin:
                fout.write(f'{line.strip()} out\n')
    print(f'finish compute process with {input_datafile} and sql {sql}')
    return output_datafile


if __name__ == '__main__':
    num_workers = 3
    print('start compute')
    all_input_datafiles = ['./test/1.txt', './test/2.txt', './test/3.txt', './test/4.txt']
    all_output_datafiles = ['./test/out1.txt', './test/out2.txt', './test/out3.txt', './test/out4.txt']
    sql = ['sql1', 'sql2', 'sql3', 'sql4']

    pool = multiprocessing.Pool(processes=num_workers)

    outputs_async = pool.starmap_async(compute_func, [(i, o, s) for i, o, s in zip(all_input_datafiles, all_output_datafiles, sql)])
    # starmap_async 异步
    print('outputs_async', outputs_async)
    outputs_async = [o for o in outputs_async.get()]  # get 会等待所有子进程完成
    print('outputs_async', outputs_async)


    print('pool finish')
    # pool.close()
    # pool.join()  # 异步的话,如果没有 get 结果,这里需要join,不然会直接执行后续代码

    for o in all_output_datafiles:
        if os.path.exists(o):
            print(f'exists {o}')
        else:
            print(f'not exists {o}')
    print(f'congratulations finish')

输出:

start compute
outputs_async <multiprocessing.pool.MapResult object at 0x000002D15FC86BE0>
finish compute process with ./test/2.txt and sql sql2
finish compute process with ./test/3.txt and sql sql3
finish compute process with ./test/4.txt and sql sql4
finish compute process with ./test/1.txt and sql sql1
outputs_async ['./test/out1.txt', './test/out2.txt', './test/out3.txt', './test/out4.txt']
pool finish
exists ./test/out1.txt
exists ./test/out2.txt
exists ./test/out3.txt
exists ./test/out4.txt
congratulations finish

如果把 get 结果行删掉,且下面没有 join 函数等待,最后没有输出文件生成

相关文章:

  • 做网站建设工资多少/在百度做广告多少钱
  • 在线做效果图的网站/网络培训网站
  • 做网站都需要年服务费吗/搜索引擎排名竞价
  • 网站建设佰首选金手指二五/百度产品推广怎么收费
  • 专业做二手网站有哪些/怎么优化关键词
  • 石家庄网站建设团队/seo营销外包
  • 线程池7个参数详解
  • 高项 23 项目管理成熟度模型
  • 牛客网之SQL刷题练习——一个实用的网站
  • ts 类型学习
  • PHP基于thinkphp+vue共享单车系统 nodejs前后端分离
  • ubuntu 20.04 qemu linux6.0.1 制作ext4根文件系统
  • 面试总结day3:springboot多环境配置、如何优雅的停止线程、gateway作用应用场景
  • MyBatisPlus之多数据源
  • Hive的表操作3
  • 【Java版oj】移除链表元素
  • java IO流【1】简介、入门
  • Spring框架原理 | IOC/DI | Bean