Live video on the AT Protocol
at eli/postgres 71 lines 1.7 kB view raw
1package statedb 2 3import ( 4 "crypto/sha256" 5 "encoding/binary" 6 "fmt" 7 "sync" 8) 9 10func (state *StatefulDB) GetNamedLock(name string) (func(), error) { 11 switch state.Type { 12 case DBTypeSQLite: 13 return state.getNamedLockSQLite(name) 14 case DBTypePostgres: 15 return state.getNamedLockPostgres(name) 16 } 17 panic("unsupported database type") 18} 19 20func (state *StatefulDB) getNamedLockPostgres(name string) (func(), error) { 21 // Convert string to sha256 hash and use decimal value for advisory lock 22 h := sha256.Sum256([]byte(name)) 23 nameInt := binary.BigEndian.Uint64(h[:32]) 24 25 err := state.DB.Exec("SELECT pg_advisory_lock($1)", nameInt).Error 26 if err != nil { 27 return nil, err 28 } 29 return func() { 30 err := state.DB.Exec("SELECT pg_advisory_unlock($1)", nameInt).Error 31 if err != nil { 32 // unfortunate, but the risk is that we're holding on to the lock forever, 33 // so it's responsible to crash in this case 34 panic(fmt.Errorf("error unlocking named lock: %w", err)) 35 } 36 }, nil 37} 38 39func (state *StatefulDB) getNamedLockSQLite(name string) (func(), error) { 40 lock := state.locks.GetLock(name) 41 lock.Lock() 42 return func() { 43 lock.Unlock() 44 }, nil 45} 46 47// Local mutex implementation for sqlite 48type NamedLocks struct { 49 mu sync.Mutex 50 locks map[string]*sync.Mutex 51} 52 53// NewNamedLocks creates a new NamedLocks instance 54func NewNamedLocks() *NamedLocks { 55 return &NamedLocks{ 56 locks: make(map[string]*sync.Mutex), 57 } 58} 59 60// GetLock returns the mutex for the given name, creating it if it doesn't exist 61func (n *NamedLocks) GetLock(name string) *sync.Mutex { 62 n.mu.Lock() 63 defer n.mu.Unlock() 64 65 lock, exists := n.locks[name] 66 if !exists { 67 lock = &sync.Mutex{} 68 n.locks[name] = lock 69 } 70 return lock 71}