Live video on the AT Protocol
1package statedb
2
3import (
4 "crypto/sha256"
5 "encoding/binary"
6 "fmt"
7 "sync"
8)
9
10func (state *StatefulDB) GetNamedLock(name string) (func(), error) {
11 switch state.Type {
12 case DBTypeSQLite:
13 return state.getNamedLockSQLite(name)
14 case DBTypePostgres:
15 return state.getNamedLockPostgres(name)
16 }
17 panic("unsupported database type")
18}
19
20func (state *StatefulDB) getNamedLockPostgres(name string) (func(), error) {
21 // we also use a local lock here - whoever is locking wants exclusive access even within the node
22 lock := state.locks.GetLock(name)
23 lock.Lock()
24 // Convert string to sha256 hash and use decimal value for advisory lock
25 h := sha256.Sum256([]byte(name))
26 nameInt := int64(binary.BigEndian.Uint64(h[:8]))
27
28 err := state.DB.Exec("SELECT pg_advisory_lock($1)", nameInt).Error
29 if err != nil {
30 lock.Unlock()
31 return nil, err
32 }
33 return func() {
34 err := state.DB.Exec("SELECT pg_advisory_unlock($1)", nameInt).Error
35 lock.Unlock()
36 if err != nil {
37 // unfortunate, but the risk is that we're holding on to the lock forever,
38 // so it's responsible to crash in this case
39 panic(fmt.Errorf("error unlocking named lock: %w", err))
40 }
41 }, nil
42}
43
44func (state *StatefulDB) getNamedLockSQLite(name string) (func(), error) {
45 lock := state.locks.GetLock(name)
46 lock.Lock()
47 return func() {
48 lock.Unlock()
49 }, nil
50}
51
52// Local mutex implementation for sqlite
53type NamedLocks struct {
54 mu sync.Mutex
55 locks map[string]*sync.Mutex
56}
57
58// NewNamedLocks creates a new NamedLocks instance
59func NewNamedLocks() *NamedLocks {
60 return &NamedLocks{
61 locks: make(map[string]*sync.Mutex),
62 }
63}
64
65// GetLock returns the mutex for the given name, creating it if it doesn't exist
66func (n *NamedLocks) GetLock(name string) *sync.Mutex {
67 n.mu.Lock()
68 defer n.mu.Unlock()
69
70 lock, exists := n.locks[name]
71 if !exists {
72 lock = &sync.Mutex{}
73 n.locks[name] = lock
74 }
75 return lock
76}