Postgresql

Python Postgres psycopg2 ThreadedConnectionPool 用盡

  • January 19, 2022

我在這裡查看了幾個“太多客戶”相關的主題,但仍然無法解決我的問題,所以我不得不再次詢問這個問題,針對我的具體情況。

基本上,我設置了我的本地 Postgres 伺服器,需要做數万次查詢,所以我使用了 Pythonpsycopg2包。這是我的程式碼:

import psycopg2
import pandas as pd
import numpy as np
from flashtext import KeywordProcessor
from psycopg2.pool import ThreadedConnectionPool
from concurrent.futures import ThreadPoolExecutor

df = pd.DataFrame({'S':['California', 'Ohio', 'Texas'], 'T':['Dispatcher', 'Zookeeper', 'Mechanics']})
# df = pd.concat([df]*10000) # repeat df 10000 times

DSN = "postgresql://User:password@localhost/db"
tcp = ThreadedConnectionPool(1, 800, DSN)

def do_one_query(inputS, inputT):
   conn = tcp.getconn()
   c = conn.cursor()

   q = r"SELECT * from eridata where "State" = 'California' and "Title" = 'Dispatcher' limit 1;"   

   c.execute(q)
   all_results = c.fetchall()
   for row in all_results:
       return row
   tcp.putconn(conn, close=True)

cnt=0
for idx, row in df.iterrows():

   cnt+=1
   with ThreadPoolExecutor(max_workers=1) as pool:
       ret = pool.submit(do_one_query,  row["S"], row["T"])
       print ret.result()
   print cnt

該程式碼使用較小的df. 如果我重複df10000 次,我會收到錯誤消息說connection pool exhausted。我雖然我使用的連接已被這條線關閉:

tcp.putconn(conn, close=True)

但我想實際上他們沒有關閉?我該如何解決這個問題?

不要傳遞參數close=Trueputconn已經“關閉”了連接。想像putconn“將連接放回池中”,這可能是您想要的。

我知道這個問題已經很老了,你不再需要答案了。但為了未來的讀者…

使用池的原因是為了對數據庫連接數進行某種控制,這樣我們就不會使數據庫伺服器過載。在您的情況下,它是 800,這是驚人的大。似乎您將數量增加到超過您的應用程序最多需要的數量,這樣您就不會再收到“Pool Exhausted”異常。

為了解決這個問題,讓我先澄清一下。psycopg ThreadPool 是一個執行緒安全的連接池,但它不控制最大連接數。它只是引發異常。從文件中閱讀:

def getconn(self):
       """Get a connection from the pool.

       If there is no idle connection available, then a new one is opened;
       unless there are already :attr:`.maxconn` connections open, then a
       :class:`PoolError` exception is raised.

       Any connection that is broken, or has been idle for more than
       :attr:`.idle_timeout` seconds, is closed and discarded.
       """

您可能已經註意到,如果達到最大值,則會引發異常。因此,雖然它是一個執行緒安全池,但它無法控制連接數。

解決方案:由於您是多執行緒,因此您需要控制執行緒行為,並且池旁邊的幸運工具是“信號量”:(以下只是我自己的簡化程式碼片段,用於切斷展示目的)

_connPool: Optional[psycopg2.pool.ThreadedConnectionPool] = None
_poolSemaphore = threading.Semaphore(10) #10 is max no of connections in this case

def start():
   try:
       Engine._connPool = psycopg2.pool.ThreadedConnectionPool(1, 10,
                                                               user=DBConnectionInfo.user,
                                                               password=DBConnectionInfo.password,
                                                               host=DBConnectionInfo.host,
                                                               port=DBConnectionInfo.port,
                                                               database=DBConnectionInfo.database)
       print("Engine started")
   except Exception as error:
       print("Engine failed to start. connection pool did not initialize:", error)

def getConnection():
   Engine._poolSemaphore.acquire(blocking=True)
   print("Pool is delivering connection")
   return Engine._connPool.getconn()

def putConnectionBack(conn: psycopg2):
   Engine._connPool.putconn(conn, close=False)
   Engine._poolSemaphore.release()
   print("Pool took back a connection")

上面的程式碼最多打開 10 個連接。信號量允許池為 10 個執行緒(安全地)提供連接,但會阻止其餘執行緒,直到這 10 個執行緒中的一個將連接返回池。就在那時semaphore.release()觸發了 a 並且阻塞的執行緒執行(從它被阻塞的同一點開始)並接收連接。因此,將其視為getConn()從池中藉用連接的過程。請注意,完成後將其放回池中非常重要。你不知道會發生什麼,你可能會問。空閒連接會在一段時間後被池殺死,但它不會觸發您的信號量,並且您的阻塞執行緒不會被觸發恢復工作。換句話說,池工作正常,但您的程式碼會停止。

引用自:https://dba.stackexchange.com/questions/196656