Skip to content Skip to sidebar Skip to footer

How To Fix Multithreading/multiprocessing With Dictionaries?

I'm making over 100K calls to an api, using 2 functions I reach out to the api with the first function and grab the sysinfo(a dict) for each host, then with the second function I g

Solution 1:

you can use threads and queue to communicate, first you will start get_ips_from_sysinfo as a single thread to monitor and process any finished sysinfo which will store output in output_list then fire all get_sys_info threads, be careful not to run out of memory with 100k threads

from threading import Thread
from queue import Queue

jobs = Queue()  # buffer for sysinfo
output_list = []  # store ipsdefget_sys_info(self, host_id, appliance):
    sysinfo = self.hx_request("https://{}:3000//hx/api/v3/hosts/{}/sysinfo"
    jobs.put(sysinfo)  # add sysinfo to jobs queuereturn sysinfo  # comment if you don't need itdefget_ips_from_sysinfo(self):
    """it will run contineously untill finish all jobd"""whileTrue:
        # get sysinfo from jobs queue
        sysinfo = jobs.get()  # it will wait here for new entryif sysinfo == 'exit':
            print('we are done here')
            break

        sysinfo = sysinfo["data"]
        network_array = sysinfo.get("networkArray", {})
        network_info = network_array.get("networkInfo", [])
        ips = []
        for ni in network_info:
            ip_array = ni.get("ipArray", {})
            ip_info = ip_array.get("ipInfo", [])
            for i in ip_info:
                ips.append(i)
        output_list.append(ips)


if __name__ == "__main__":
    # start our listner thread
    Thread(target=rr.get_ips_from_sysinfo)

    threads = []
    for i in ids:
        t = Thread(target=rr.get_sys_info, args=(i, appliance))
        threads.append(t)
        t.start()

    # wait for threads to finish then terminate get_ips_from_sysinfo() by send 'exit' flagfor t in threads:
        t.join()

    jobs.put('exit')

Solution 2:

As @wwii commented, concurrent.futures offer some conveniences that you may help you, particularly since this looks like a batch job.

It appears that your performance hit is most likely to come from the network calls so multithreading is probably more suitable for your use case (here is a comparison with multiprocessing). If not, you can switch the pool from threads to processes while using the same APIs.

from concurrent.futures import ThreadPoolExecutor, as_completed
# You can import ProcessPoolExecutor instead and use the same APIsdefthread_worker(instance, host_id, appliance):
    """Wrapper for your class's `get_sys_info` method"""
    sysinfo = instance.get_sys_info(host_id, appliance)
    return sysinfo, instance

# instantiate the class that contains the methods in your example code# I will call it `RR`
instances = (RR(*your_args, **your_kwds) for your_args, your_kwds 
    inzip(iterable_of_args, iterable_of_kwds))
all_host_ids = another_iterable
all_appliances = still_another_iterable

if __name__ == "__main__":
   with ThreadPoolExecutor(max_workers=50) as executor:  # assuming 10 threads per core; your example uses 5 processes
        pool = {executor.submit(thread_worker, instance, _id, _app): (_id, _app)
            for _id, _app inzip(instances, all_host_ids, all_appliances)}

        # handle the `sysinfo` dicts as they arrivefor future in as_completed(pool):
            _result = future.result()
            ifisinstance(_sysinfo, Exception):  # just one way of handling exceptions# do somethingprint(f"{pool[future]} raised {future.result()}")
            else:
                # enqueue results for parallel processing in a separate stage, or# process the results serially
                _sysinfo, _instance = _result
                ips = _instance.get_ips_from_sysinfo(_sysinfo)
                # do something with `ips`

You can streamline this example by refactoring your methods into functions, if indeed they don't make use of state as seems to be the case in your code.

If extracting the sysinfo data is expensive, you can enqueue the results and in turn feed those to a ProcessPoolExecutor that calls get_ips_from_sysinfo on the queued dicts.

Solution 3:

For whatever reason I was a little leary about calling an instance method in numerous threads - but it seems to work. I made this toy example using concurrent.futures - hopefully it mimics your actual situation well enough. This submits 4000 instance method calls to a thread pool of (at max) 500 workers. Playing around with the max_workers value I found that execution time improvements were pretty linear up to about a 1000 workers then the improvement ratio started tailing off.

import concurrent.futures, time, random

a = [.001*n for n inrange(1,4001)]

classF:
    def__init__(self, name):
        self.name = f'{name}:{self.__class__.__name__}'defapicall(self,n):
        wait = random.choice(a)
        time.sleep(wait)
        return (n,wait, self.name)

f = F('foo')

if __name__ == '__main__':
    nworkers = 500with concurrent.futures.ThreadPoolExecutor(nworkers) as executor:
#        t = time.time()
        futures = [executor.submit(f.apicall, n) for n inrange(4000)]
        results = [future.result() for future in concurrent.futures.as_completed(futures)]
#        t = time.time() - t#    q = sum(r[1] for r in results)#    print(f'# workers:{nworkers} - ratio:{q/t}')

I didn't account for possible Exceptions being thrown during the method call but the example in the docs is pretty clear how to handle that.

Solution 4:

So... after days of looking at the suggestions on here(thank you so much!!!) And a couple outside reading (Fluent Python Ch 17 and Effective Python 59 Specific Ways..)

def get_ips_from_sysinfo(urls):
    sysinfo = lx_request(urls)
    ip_dict =[]
    sysinfo = sysinfo["data"]
    hostname = sysinfo.get("hostname")
    network_array = sysinfo.get("networkArray", {})
    network_info = network_array.get("networkInfo", [])
    ips = []
    entry = {}
    entry["hostname"] = hostname
    entry["ip_addrs"] = []
    for ni in network_info:
        ip_array = ni.get("ipArray", {})
        ip_info = ip_array.get("ipInfo", [])
        for ip in ip_info:
            ip_addr = ip.get("ipAddress", None)
            ifnot ip_addr:
                ip_addr = ip.get("ipv6Address", None)
            if ip is None:
                continueifnotis_ip_private(ip_addr):
                entry["ip_addrs"].append(ip_addr)
        iflen(entry["ip_addrs"]) == 0:
            continueelse:
            ip_dict.append(entry)
        return ip_dict

urls = get_sys_info(appliance, ids)

def main():
    pool = ThreadPoolExecutor(max_workers = 15)
    results = list(tqdm(pool.map(get_ips_from_sysinfo, urls), total=len(urls)))
    withopen("ip_array.json", "w+") as f:
        json.dump(results, f,  indent=2, sort_keys=True)

main()

*Modified this works now, hope it helps someone else

Post a Comment for "How To Fix Multithreading/multiprocessing With Dictionaries?"