at main 3.5 kB view raw
1package storage 2 3import ( 4 "context" 5 "fmt" 6 "time" 7) 8 9// NewDatabase creates a database connection based on type 10func NewDatabase(dbType, connectionString string) (Database, error) { 11 switch dbType { 12 case "postgres", "postgresql": 13 return NewPostgresDB(connectionString) 14 default: 15 return nil, fmt.Errorf("unsupported database type: %s (supported: sqlite, postgres)", dbType) 16 } 17} 18 19type Database interface { 20 Close() error 21 Migrate() error 22 23 // Endpoint operations 24 UpsertEndpoint(ctx context.Context, endpoint *Endpoint) error 25 GetEndpoint(ctx context.Context, endpoint string, endpointType string) (*Endpoint, error) 26 GetEndpoints(ctx context.Context, filter *EndpointFilter) ([]*Endpoint, error) 27 EndpointExists(ctx context.Context, endpoint string, endpointType string) (bool, error) 28 GetEndpointIDByEndpoint(ctx context.Context, endpoint string, endpointType string) (int64, error) 29 GetEndpointScans(ctx context.Context, endpointID int64, limit int) ([]*EndpointScan, error) 30 UpdateEndpointIPs(ctx context.Context, endpointID int64, ipv4, ipv6 string, resolvedAt time.Time) error 31 SaveEndpointScan(ctx context.Context, scan *EndpointScan) error 32 SetScanRetention(retention int) 33 UpdateEndpointStatus(ctx context.Context, endpointID int64, update *EndpointUpdate) error 34 UpdateEndpointServerDID(ctx context.Context, endpointID int64, serverDID string) error 35 GetDuplicateEndpoints(ctx context.Context) (map[string][]string, error) 36 37 // PDS virtual endpoints (created via JOINs) 38 GetPDSList(ctx context.Context, filter *EndpointFilter) ([]*PDSListItem, error) 39 GetPDSDetail(ctx context.Context, endpoint string) (*PDSDetail, error) 40 GetPDSStats(ctx context.Context) (*PDSStats, error) 41 GetCountryLeaderboard(ctx context.Context) ([]*CountryStats, error) 42 GetVersionStats(ctx context.Context) ([]*VersionStats, error) 43 44 // IP operations (IP as primary key) 45 UpsertIPInfo(ctx context.Context, ip string, ipInfo map[string]interface{}) error 46 GetIPInfo(ctx context.Context, ip string) (*IPInfo, error) 47 ShouldUpdateIPInfo(ctx context.Context, ip string) (exists bool, needsUpdate bool, err error) 48 49 // Cursor operations 50 GetScanCursor(ctx context.Context, source string) (*ScanCursor, error) 51 UpdateScanCursor(ctx context.Context, cursor *ScanCursor) error 52 53 // Metrics 54 StorePLCMetrics(ctx context.Context, metrics *PLCMetrics) error 55 GetPLCMetrics(ctx context.Context, limit int) ([]*PLCMetrics, error) 56 GetEndpointStats(ctx context.Context) (*EndpointStats, error) 57 58 // DID operations 59 UpsertDID(ctx context.Context, did string, bundleNum int, handle, pds string) error 60 UpsertDIDFromMempool(ctx context.Context, did string, handle, pds string) error 61 GetDIDRecord(ctx context.Context, did string) (*DIDRecord, error) 62 GetDIDByHandle(ctx context.Context, handle string) (*DIDRecord, error) // NEW 63 GetGlobalDIDInfo(ctx context.Context, did string) (*GlobalDIDInfo, error) 64 AddBundleDIDs(ctx context.Context, bundleNum int, dids []string) error 65 GetTotalDIDCount(ctx context.Context) (int64, error) 66 67 // PDS Repo operations 68 UpsertPDSRepos(ctx context.Context, endpointID int64, repos []PDSRepoData) error 69 GetPDSRepos(ctx context.Context, endpointID int64, activeOnly bool, limit int, offset int) ([]*PDSRepo, error) 70 GetReposByDID(ctx context.Context, did string) ([]*PDSRepo, error) 71 GetPDSRepoStats(ctx context.Context, endpointID int64) (map[string]interface{}, error) 72 73 // Internal 74 GetTableSizes(ctx context.Context, schema string) ([]TableSizeInfo, error) 75 GetIndexSizes(ctx context.Context, schema string) ([]IndexSizeInfo, error) 76}