如何正確使用 PostgreSQL 來限制任務的多個和/或併發執行
假設您有一個
FOO
可以每分鐘排隊一次的任務,以及一個可以暫停的 50 個工作人員池。隊列暫停 10 分鐘,有 10 個FOO
任務排隊。當隊列恢復時,這 10 個FOO
任務將幾乎同時執行(因為工人比任務多)。
FOO
在這種情況下,我需要確保每分鐘執行的任務不超過 1個(時間可能會有所不同)。使用 Redis 的一種解決方案是利用 Redis atomic 和
TTL
key 的優勢。當FOO
任務啟動時,它會檢查密鑰是否worker:FOO
存在。如果存在,則存在,如果不存在,則將值和 aTTL
設置為最大頻率。如果前面的命令返回 1,這很容易實現使用SETNX worker:FOO whatever
然後使用。TTL worker:FOO
因為
SETNX
是原子的,所以我不會陷入FOO
由於 GET 和 SET 之間的競爭條件而執行兩個任務的情況。現在的問題是:使用 PostgreSQL 達到相同結果的正確方法是什麼?我可以有一個帶有 a
key
和executed_on
時間戳值的表,但是如何確保不會因為檢查記錄和寫入鎖FOO
之間的延遲而同時執行兩個任務?FOO 1
由於您正在嘗試序列化工作,因此我會更新表中的記錄。
CREATE TABLE task_keys ( task varchar(10) primary key, last_executed timestamp with time zone not null, by_worker_id integer ); INSERT INTO task_keys(task, last_executed) VALUES ('FOO', '-infinity');
然後看看你是否可以執行任務:
UPDATE task_keys SET last_executed = current_timestamp, by_worker = $1 WHERE task = 'FOO' AND last_executed < (current_timestamp - INTERVAL '1' MINUTE) RETURNING *;
隔離規則
READ COMMITTED
保證如果此查詢成功更新表並返回一行,則沒有其他查詢可以同時這樣做。對相關task_keys
行進行行鎖。如果另一個UPDATE
嘗試影響同一行,它會等到行鎖被持有事務的送出或回滾釋放……然後它將重新檢查該WHERE
子句。如果其他 tx 已送出,則該WHERE
子句將不再匹配,因此它將影響零行。請參閱有關事務隔離的文件。
如果您需要並發,這會有點棘手。你真正想要的是一個在計時器上重新填充的代幣池,工作人員可以從池中獲取代幣來完成工作。這實際上就是我們在這裡所做的,使用一個令牌 - 所以一個選項是為同一個任務添加更多行並獲取 last_executed 時間戳足夠舊的第一個任務。
但是,整個方法有兩個缺陷:
- 它不知道任務何時完成,因此長時間執行的任務可能會重疊;和
- 它不關心任務是成功還是失敗
要解決這些問題,您需要使用適當的工作隊列實現。這些目前很難在數據庫中正確實現,因此我建議您考慮使用外部消息隊列/工作隊列系統來管理它們。不過,在 PostgreSQL 9.5 中,新
FOR UPDATE SKIP LOCKED
功能將使在數據庫中實現這樣的工作隊列變得非常簡單。順便說一句,對於這類事情,建議鎖定通常是一個不錯的選擇,但它不會幫助您在一定時間後自動使鎖定過期。