函数仅由一个线程执行, 而不是多个

共1个回答, 标签: python mysql multithreading queue

我编写了一个巨蟒脚本, 将市场数据导入到 MariaDB 数据库中。为了加快导入速度, 我决定使用模块线程。因此, 首先, 函数使用 urls 填充队列, 从中下载数据并将其导入到我的数据库中。不幸的是, 导入函数似乎只由一个线程而不是许多线程处理。

import queue
from threading import Thread

num_threads = 4
threads = []
urls = queue.Queue()

def create_url():

   ...
   getlist of items
   ...

   for row in item_list:
      url = 'https://someurl=' + str(row[0])
      urls.put(url)

   return urls


def import_mo(urls):
    station_id = 60003760

    print(worker.getName())

    try:
        mariadb_connection = mariadb.connect(allthedbstuff)
        cursor = mariadb_connection.cursor()

        while (True):
            url = urls.get()
            print("%s processes %s queue# %s" % (worker.getName(), url, urls.qsize()))
            if url == None:
                break
            s = requests.Session()
            retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504])
            s.mount('https://', HTTPAdapter(max_retries=retries))
            jsonraw = s.get(url)
            jsondata = ujson.loads(jsonraw.text)

            for row in jsondata:
                if (row['location_id'] == station_id):
                    cursor.execute(
                        'INSERT INTO tbl_mo_tmp (order_id) VALUES (%s)', (row['order_id'], ))

                cursor.execute('SELECT order_id from tbl_mo WHERE order_id = %s',
                               (row['order_id'], ))
                exists_mo = cursor.fetchall()

                if len(exists_mo) != 0:
                    # print("updating order#", row['order_id'])
                    cursor.execute('UPDATE tbl_mo SET volume = %s, price = %s WHERE order_id = %s',
                                   (row['volume_remain'], row['price'], row['order_id'], ))
                    mariadb_connection.commit()
                else:
                    if (row['location_id'] == station_id):
                        # print("newly inserting order#", row['order_id'])
                        cursor.execute('INSERT INTO tbl_mo (type_id, order_id, ordertype,volume, price) VALUES (%s,%s,%s,%s,%s)',
                                       (row['type_id'], row['order_id'], row['is_buy_order'], row['volume_remain'], row['price'], ))
                    mariadb_connection.commit()
            urls.task_done()

    except mariadb.Error as error:
        mariadb_connection.rollback()  # rollback if any exception occured

    finally:
        # closing database connection.
        if mariadb_connection.is_connected():
            cursor.close()
            mariadb_connection.close()

def cleanup_mo():
   ...
   do cleanup stuff
   ...

create_url()

for i in range(num_threads):
    worker = Thread(target=import_mo, args=(urls,))
    worker.setDaemon(True)
    threads.append(worker)
    worker.start()


for i in range(num_threads):
    urls.put(None)

for worker in threads:
    worker.join()

cleanup_mo()

输出在开头的状态:

Thread-1
Thread-2
Thread-3
Thread-4

这表明, 4 个单独的工作人员被创建, 但进入的同时循环, 使它似乎只有一个工作人员实际上处理的网址提取。

Thread-1 processes https://someurl=2 queue# 32
Thread-1 processes https://someurl=3 queue# 31
Thread-1 processes https://someurl=4 queue# 30
Thread-1 processes https://someurl=5 queue# 29
Thread-1 processes https://someurl=6 queue# 28
Thread-1 processes https://someurl=7 queue# 27
Thread-1 processes https://someurl=8 queue# 26
Thread-1 processes https://someurl=9 queue# 25
Thread-1 processes https://someurl=10 qu
第1个答案

要为每个工作人员打印不同的 "名称", 请执行以下操作:

def import_mo(i, urls):
    station_id = 60003760

    print('Worker', i)
    # etc
    # later:
        print("Worker %s processes %s queue# %s" % (i, url, urls.qsize()))

并创建线程:

for i in range(num_threads):
    worker = Thread(target=import_mo, args=(i,urls,))
    worker.setDaemon(True)
    threads.append(worker)
    worker.start()

相关问题

通过随机抽样其他列数据创建新列 如何使用熊猫获得包括每一个组合的计数 如何有效地展开矩阵的值与小块? 参考-此错误在 PHP 中意味着什么? mysql _ 拿给数组 ()/mysql_fetch_assoc ()/mysql_fetch_row ()/mysql_num_rows 等希望参数1是资源或结果 如何防止 PHP 中的 SQL 注入? 如何在异步和线程中执行大量 sql 查询 如何在 Java 8 中创建一个阻塞的背景加载器? 在 cppreference 中,宽松排序的解释是错误的吗? 狂饮在后台进程中抛出 RejectionException 而不是 ConnectionException 函数仅由一个线程执行, 而不是多个