make pg backend acquire schema-based global locks (#2411)

Signed-off-by: ollevche <ollevche@gmail.com>
This commit is contained in:
Oleksandr Levchenkov
2025-01-31 14:21:36 +02:00
committed by GitHub
parent 2f27d7eb90
commit 2a4d81042b
7 changed files with 213 additions and 22 deletions

View File

@@ -9,6 +9,7 @@ import (
"crypto/md5"
"database/sql"
"fmt"
"hash/fnv"
uuid "github.com/hashicorp/go-uuid"
_ "github.com/lib/pq"
@@ -90,15 +91,20 @@ func (c *RemoteClient) Lock(info *statemgr.LockInfo) (string, error) {
return nil
}
// Try to acquire locks for the existing row `id` and the creation lock `-1`.
query := `SELECT %s.id, pg_try_advisory_lock(%s.id), pg_try_advisory_lock(-1) FROM %s.%s WHERE %s.name = $1`
row := c.Client.QueryRow(fmt.Sprintf(query, statesTableName, statesTableName, c.SchemaName, statesTableName, statesTableName), c.Name)
creationLockID := c.composeCreationLockID()
// Try to acquire locks for the existing row `id` and the creation lock.
//nolint:gosec // we only parameterize user passed values
query := fmt.Sprintf(`SELECT %s.id, pg_try_advisory_lock(%s.id), pg_try_advisory_lock(%s) FROM %s.%s WHERE %s.name = $1`,
statesTableName, statesTableName, creationLockID, c.SchemaName, statesTableName, statesTableName)
row := c.Client.QueryRow(query, c.Name)
var pgLockId, didLock, didLockForCreate []byte
err = row.Scan(&pgLockId, &didLock, &didLockForCreate)
switch {
case err == sql.ErrNoRows:
// No rows means we're creating the workspace. Take the creation lock.
innerRow := c.Client.QueryRow(`SELECT pg_try_advisory_lock(-1)`)
innerRow := c.Client.QueryRow(fmt.Sprintf(`SELECT pg_try_advisory_lock(%s)`, creationLockID))
var innerDidLock []byte
err := innerRow.Scan(&innerDidLock)
if err != nil {
@@ -107,20 +113,20 @@ func (c *RemoteClient) Lock(info *statemgr.LockInfo) (string, error) {
if string(innerDidLock) == "false" {
return "", &statemgr.LockError{Info: info, Err: fmt.Errorf("Already locked for workspace creation: %s", c.Name)}
}
info.Path = "-1"
info.Path = creationLockID
case err != nil:
return "", &statemgr.LockError{Info: info, Err: err}
case string(didLock) == "false":
// Existing workspace is already locked. Release the attempted creation lock.
lockUnlock("-1")
_ = lockUnlock(creationLockID)
return "", &statemgr.LockError{Info: info, Err: fmt.Errorf("Workspace is already locked: %s", c.Name)}
case string(didLockForCreate) == "false":
// Someone has the creation lock already. Release the existing workspace because it might not be safe to touch.
lockUnlock(string(pgLockId))
_ = lockUnlock(string(pgLockId))
return "", &statemgr.LockError{Info: info, Err: fmt.Errorf("Cannot lock workspace; already locked for workspace creation: %s", c.Name)}
default:
// Existing workspace is now locked. Release the attempted creation lock.
lockUnlock("-1")
_ = lockUnlock(creationLockID)
info.Path = string(pgLockId)
}
c.info = info
@@ -128,10 +134,6 @@ func (c *RemoteClient) Lock(info *statemgr.LockInfo) (string, error) {
return info.ID, nil
}
func (c *RemoteClient) getLockInfo() (*statemgr.LockInfo, error) {
return c.info, nil
}
func (c *RemoteClient) Unlock(id string) error {
if c.info != nil && c.info.Path != "" {
query := `SELECT pg_advisory_unlock(%s)`
@@ -145,3 +147,9 @@ func (c *RemoteClient) Unlock(id string) error {
}
return nil
}
func (c *RemoteClient) composeCreationLockID() string {
hash := fnv.New32()
hash.Write([]byte(c.SchemaName))
return fmt.Sprintf("%d", int64(hash.Sum32())*-1)
}