+45
-35
internal/storage/postgres.go
+45
-35
internal/storage/postgres.go
···
8
8
"strings"
9
9
"time"
10
10
11
-
_ "github.com/lib/pq"
11
+
"github.com/jackc/pgx/v5"
12
+
"github.com/jackc/pgx/v5/pgxpool"
13
+
_ "github.com/jackc/pgx/v5/stdlib"
12
14
)
13
15
14
16
type PostgresDB struct {
15
-
db *sql.DB
17
+
db *sql.DB
18
+
pool *pgxpool.Pool // Add this for COPY support
16
19
}
17
20
18
21
func NewPostgresDB(connString string) (*PostgresDB, error) {
19
-
db, err := sql.Open("postgres", connString)
22
+
// Open standard sql.DB (for compatibility)
23
+
db, err := sql.Open("pgx", connString)
20
24
if err != nil {
21
25
return nil, err
22
26
}
23
27
24
28
// Connection pool settings
25
-
db.SetMaxOpenConns(25)
26
-
db.SetMaxIdleConns(10)
29
+
db.SetMaxOpenConns(50)
30
+
db.SetMaxIdleConns(25)
27
31
db.SetConnMaxLifetime(5 * time.Minute)
28
32
db.SetConnMaxIdleTime(2 * time.Minute)
29
33
···
32
36
return nil, fmt.Errorf("failed to ping database: %w", err)
33
37
}
34
38
35
-
return &PostgresDB{db: db}, nil
39
+
// Also create pgx pool for COPY operations
40
+
pool, err := pgxpool.New(context.Background(), connString)
41
+
if err != nil {
42
+
return nil, fmt.Errorf("failed to create pgx pool: %w", err)
43
+
}
44
+
45
+
return &PostgresDB{db: db, pool: pool}, nil
36
46
}
37
47
38
48
func (p *PostgresDB) Close() error {
49
+
if p.pool != nil {
50
+
p.pool.Close()
51
+
}
39
52
return p.db.Close()
40
53
}
41
54
···
920
933
return nil
921
934
}
922
935
923
-
tx, err := p.db.BeginTx(ctx, nil)
936
+
// Acquire a connection from the pool
937
+
conn, err := p.pool.Acquire(ctx)
924
938
if err != nil {
925
939
return err
926
940
}
927
-
defer tx.Rollback()
941
+
defer conn.Release()
942
+
943
+
// Start transaction
944
+
tx, err := conn.Begin(ctx)
945
+
if err != nil {
946
+
return err
947
+
}
948
+
defer tx.Rollback(ctx)
928
949
929
950
// Create temporary table
930
-
_, err = tx.ExecContext(ctx, `
951
+
_, err = tx.Exec(ctx, `
931
952
CREATE TEMP TABLE temp_dids (did TEXT PRIMARY KEY) ON COMMIT DROP
932
953
`)
933
954
if err != nil {
934
955
return err
935
956
}
936
957
937
-
// Bulk insert DIDs into temp table
938
-
batchSize := 5000
939
-
for i := 0; i < len(dids); i += batchSize {
940
-
end := i + batchSize
941
-
if end > len(dids) {
942
-
end = len(dids)
943
-
}
944
-
batch := dids[i:end]
945
-
946
-
placeholders := make([]string, len(batch))
947
-
args := make([]interface{}, len(batch))
948
-
for j, did := range batch {
949
-
placeholders[j] = fmt.Sprintf("($%d)", j+1)
950
-
args[j] = did
951
-
}
952
-
953
-
query := fmt.Sprintf(`INSERT INTO temp_dids VALUES %s ON CONFLICT DO NOTHING`,
954
-
strings.Join(placeholders, ","))
955
-
956
-
if _, err := tx.ExecContext(ctx, query, args...); err != nil {
957
-
return err
958
-
}
958
+
// Use COPY for blazing fast bulk insert
959
+
_, err = tx.Conn().CopyFrom(
960
+
ctx,
961
+
pgx.Identifier{"temp_dids"},
962
+
[]string{"did"},
963
+
pgx.CopyFromSlice(len(dids), func(i int) ([]interface{}, error) {
964
+
return []interface{}{dids[i]}, nil
965
+
}),
966
+
)
967
+
if err != nil {
968
+
return err
959
969
}
960
970
961
971
// Step 1: Insert new DIDs
962
-
_, err = tx.ExecContext(ctx, `
972
+
_, err = tx.Exec(ctx, `
963
973
INSERT INTO dids (did, bundle_numbers, created_at)
964
974
SELECT td.did, $1::jsonb, CURRENT_TIMESTAMP
965
975
FROM temp_dids td
···
970
980
return err
971
981
}
972
982
973
-
// Step 2: Update existing DIDs (only if bundle not already present)
974
-
_, err = tx.ExecContext(ctx, `
983
+
// Step 2: Update existing DIDs
984
+
_, err = tx.Exec(ctx, `
975
985
UPDATE dids
976
986
SET bundle_numbers = bundle_numbers || $1::jsonb
977
987
FROM temp_dids
···
983
993
return err
984
994
}
985
995
986
-
return tx.Commit()
996
+
return tx.Commit(ctx)
987
997
}
988
998
989
999
func (p *PostgresDB) GetTotalDIDCount(ctx context.Context) (int64, error) {