Skip to content Skip to sidebar Skip to footer

How To Use Multiprocessing In Current Python Application?

I have an application that is reading thousand of files from different directories, it reads them, does some processing with them and then sends the data to a database. I have 1 pr

Solution 1:

I am using multiprocessing where each process has its own connection to the database. I have done the minimal changes to your code in an attempt to process directories in parallel. However, I am not sure if a variable such as subdir_paths is correctly named since the "s" at the end of its name implies that it contains multiple path names.

The reason why it has been suggested that this question is better suited for Code Review is because presumably you have an already-working program and you are only looking for a performance improvement (of course, that applies to a great percentage of the question posted on SO that are tagged with multiprocessing). That type of question is supposed to be posted on https://codereview.stackexchange.com/.

import mysql.connector
import csv
import os
import time
from datetime import datetime
import ntpath
import configparser
from multiprocessing import Pool, cpu_count

config = configparser.ConfigParser()
config.read('C:\Desktop\Energy\file_cfg.ini')

source = config['PATHS']['source']
archive = config['PATHS']['archive']

defget_connnection():
    mydb = mysql.connector.connect(
            host= config['DB']['host'],
            user = config['DB']['user'],
            passwd = config['DB']['passwd'],
            database= config['DB']['database']
        )
    return mydb

defget_mp_list():
    select_antenna = "SELECT * FROM `antenna`"
    mydb = get_connection()
    cursor = mydb.cursor()
    cursor.execute(select_antenna)

    mp_mysql = [i[0] for i in cursor.fetchall()]
    mp_server = os.listdir(source)

    # microbeats clean.
    cursor.execute("TRUNCATE TABLE microbeats")
    mydb.commit()
    mydb.close()

    mp_list = [mp for mp in mp_mysql if mp in mp_server]
    return mp_list

defprocess_mp(mp):
    subdir_paths = os.path.join(source, mp)
    for file in os.listdir(subdir_paths):
        file_paths = os.path.join(subdir_paths, file)

        cr_time_s = os.path.getctime(file_paths)
        cr_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(cr_time_s))

    all_file_paths = [os.path.join(subdir_paths, f) for f in os.listdir(subdir_paths)]
    full_file_paths = [p for p in all_file_paths if os.path.getsize(p) > 0] #<----- Control empty files.if full_file_paths != []:
        newest_file_paths = max(full_file_paths, key=os.path.getctime)

        mydb = get_connection()
        cursor = mydb.cursor()
        did_insert = False
        q1 = ("INSERT INTO microbeats""(`antenna`,`datetime`,`system`,`item`,`event`, `status`, `accident`)""VALUES (%s, %s, %s,%s, %s, %s, %s)")
        for file in all_file_paths:
            if file == newest_file_paths and os.path.getctime(newest_file_paths) < time.time() - 120:
                withopen(file, 'rt') as f:
                    reader = csv.reader(f, delimiter ='\t')
                    line_data0 = list()
                    col = next(reader)

                    for line in reader:
                        line.insert(0, mp)
                        line.insert(1, cr_time)
                        if line != []: #<----- Control empty directories.
                            line_data0.append(line)

                    if line_data0:
                        cursor.executemany(q1, line_data0)
                        did_insert = Trueif did_insert:
            mydb.commit()
        mydb.close()

defmain():
    mp_list = get_mp_list()
    pool = Pool(min(cpu_count(), len(mp_list)))
    results = pool.imap_unordered(process_mp, mp_list)
    whileTrue:
        try:
            result = next(results)
        except StopIteration:
            breakexcept BaseException as e:
            print(e)

if __name__ == '__main__':
    main()

Post a Comment for "How To Use Multiprocessing In Current Python Application?"