Python Postgres psycopg2 ThreadedConnectionPool 用盡
我在這裡查看了幾個“太多客戶”相關的主題,但仍然無法解決我的問題,所以我不得不再次詢問這個問題,針對我的具體情況。
基本上,我設置了我的本地 Postgres 伺服器,需要做數万次查詢,所以我使用了 Python
psycopg2
包。這是我的程式碼:import psycopg2 import pandas as pd import numpy as np from flashtext import KeywordProcessor from psycopg2.pool import ThreadedConnectionPool from concurrent.futures import ThreadPoolExecutor df = pd.DataFrame({'S':['California', 'Ohio', 'Texas'], 'T':['Dispatcher', 'Zookeeper', 'Mechanics']}) # df = pd.concat([df]*10000) # repeat df 10000 times DSN = "postgresql://User:password@localhost/db" tcp = ThreadedConnectionPool(1, 800, DSN) def do_one_query(inputS, inputT): conn = tcp.getconn() c = conn.cursor() q = r"SELECT * from eridata where "State" = 'California' and "Title" = 'Dispatcher' limit 1;" c.execute(q) all_results = c.fetchall() for row in all_results: return row tcp.putconn(conn, close=True) cnt=0 for idx, row in df.iterrows(): cnt+=1 with ThreadPoolExecutor(max_workers=1) as pool: ret = pool.submit(do_one_query, row["S"], row["T"]) print ret.result() print cnt
該程式碼使用較小的
df
. 如果我重複df
10000 次,我會收到錯誤消息說connection pool exhausted
。我雖然我使用的連接已被這條線關閉:tcp.putconn(conn, close=True)
但我想實際上他們沒有關閉?我該如何解決這個問題?
不要傳遞參數
close=True
。putconn
已經“關閉”了連接。想像putconn
“將連接放回池中”,這可能是您想要的。
我知道這個問題已經很老了,你不再需要答案了。但為了未來的讀者…
使用池的原因是為了對數據庫連接數進行某種控制,這樣我們就不會使數據庫伺服器過載。在您的情況下,它是 800,這是驚人的大。似乎您將數量增加到超過您的應用程序最多需要的數量,這樣您就不會再收到“Pool Exhausted”異常。
為了解決這個問題,讓我先澄清一下。psycopg ThreadPool 是一個執行緒安全的連接池,但它不控制最大連接數。它只是引發異常。從文件中閱讀:
def getconn(self): """Get a connection from the pool. If there is no idle connection available, then a new one is opened; unless there are already :attr:`.maxconn` connections open, then a :class:`PoolError` exception is raised. Any connection that is broken, or has been idle for more than :attr:`.idle_timeout` seconds, is closed and discarded. """
您可能已經註意到,如果達到最大值,則會引發異常。因此,雖然它是一個執行緒安全池,但它無法控制連接數。
解決方案:由於您是多執行緒,因此您需要控制執行緒行為,並且池旁邊的幸運工具是“信號量”:(以下只是我自己的簡化程式碼片段,用於切斷展示目的)
_connPool: Optional[psycopg2.pool.ThreadedConnectionPool] = None _poolSemaphore = threading.Semaphore(10) #10 is max no of connections in this case def start(): try: Engine._connPool = psycopg2.pool.ThreadedConnectionPool(1, 10, user=DBConnectionInfo.user, password=DBConnectionInfo.password, host=DBConnectionInfo.host, port=DBConnectionInfo.port, database=DBConnectionInfo.database) print("Engine started") except Exception as error: print("Engine failed to start. connection pool did not initialize:", error) def getConnection(): Engine._poolSemaphore.acquire(blocking=True) print("Pool is delivering connection") return Engine._connPool.getconn() def putConnectionBack(conn: psycopg2): Engine._connPool.putconn(conn, close=False) Engine._poolSemaphore.release() print("Pool took back a connection")
上面的程式碼最多打開 10 個連接。信號量允許池為 10 個執行緒(安全地)提供連接,但會阻止其餘執行緒,直到這 10 個執行緒中的一個將連接返回池。就在那時
semaphore.release()
觸發了 a 並且阻塞的執行緒執行(從它被阻塞的同一點開始)並接收連接。因此,將其視為getConn()
從池中藉用連接的過程。請注意,完成後將其放回池中非常重要。你不知道會發生什麼,你可能會問。空閒連接會在一段時間後被池殺死,但它不會觸發您的信號量,並且您的阻塞執行緒不會被觸發恢復工作。換句話說,池工作正常,但您的程式碼會停止。