tangled
alpha
login
or
join now
stream.place
/
streamplace
Live video on the AT Protocol
74
fork
atom
overview
issues
1
pulls
pipelines
model: much faster latest-segment detection
Eli Mallon
3 months ago
20705ae4
cb213f8c
+95
-89
3 changed files
expand all
collapse all
unified
split
pkg
config
config.go
model
segment.go
segment_test.go
+20
-5
pkg/config/config.go
···
297
slogGorm.WithTraceAll(),
298
)
299
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
300
func (cli *CLI) Parse(fs *flag.FlagSet, args []string) error {
301
err := ff.Parse(
302
fs, args,
···
344
*dest = strings.Replace(*dest, SPDataDir, cli.DataDir, 1)
345
}
346
if !cli.SQLLogging {
347
-
GormLogger = slogGorm.New(
348
-
slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{
349
-
TimeFormat: time.RFC3339,
350
-
})),
351
-
)
352
}
353
if cli.XXDeprecatedPublicHost != "" && cli.BroadcasterHost == "" {
354
log.Warn(context.Background(), "public-host is deprecated, use broadcaster-host or server-host instead as appropriate")
···
297
slogGorm.WithTraceAll(),
298
)
299
300
+
func DisableSQLLogging() {
301
+
GormLogger = slogGorm.New(
302
+
slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{
303
+
TimeFormat: time.RFC3339,
304
+
})),
305
+
)
306
+
}
307
+
308
+
func EnableSQLLogging() {
309
+
GormLogger = slogGorm.New(
310
+
slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{
311
+
TimeFormat: time.RFC3339,
312
+
})),
313
+
slogGorm.WithTraceAll(),
314
+
)
315
+
}
316
+
317
func (cli *CLI) Parse(fs *flag.FlagSet, args []string) error {
318
err := ff.Parse(
319
fs, args,
···
361
*dest = strings.Replace(*dest, SPDataDir, cli.DataDir, 1)
362
}
363
if !cli.SQLLogging {
364
+
DisableSQLLogging()
365
+
} else {
366
+
EnableSQLLogging()
0
0
367
}
368
if cli.XXDeprecatedPublicHost != "" && cli.BroadcasterHost == "" {
369
log.Warn(context.Background(), "public-host is deprecated, use broadcaster-host or server-host instead as appropriate")
+16
-36
pkg/model/segment.go
···
138
ID string `json:"id" gorm:"primaryKey"`
139
SigningKeyDID string `json:"signingKeyDID" gorm:"column:signing_key_did"`
140
SigningKey *SigningKey `json:"signingKey,omitempty" gorm:"foreignKey:DID;references:SigningKeyDID"`
141
-
StartTime time.Time `json:"startTime" gorm:"index:latest_segments"`
142
-
RepoDID string `json:"repoDID" gorm:"index:latest_segments;column:repo_did"`
143
Repo *Repo `json:"repo,omitempty" gorm:"foreignKey:DID;references:RepoDID"`
144
Title string `json:"title"`
145
Size int `json:"size" gorm:"column:size"`
···
243
244
err := m.DB.Table("segments").
245
Select("segments.*").
246
-
Where("id IN (?)",
247
-
m.DB.Table("segments").
248
-
Select("id").
249
-
Where("(repo_did, start_time) IN (?)",
250
-
m.DB.Table("segments").
251
-
Select("repo_did, MAX(start_time)").
252
-
Group("repo_did"))).
253
Order("start_time DESC").
254
-
Joins("JOIN repos ON segments.repo_did = repos.did").
255
-
Preload("Repo").
256
Find(&segments).Error
257
-
258
if err != nil {
259
return nil, err
260
}
···
262
return []Segment{}, nil
263
}
264
265
-
filteredSegments := []Segment{}
266
for _, seg := range segments {
267
-
if seg.StartTime.After(thirtySecondsAgo) {
268
-
filteredSegments = append(filteredSegments, seg)
0
0
0
0
0
269
}
270
}
271
0
0
0
0
0
272
return filteredSegments, nil
273
}
274
···
296
return nil, err
297
}
298
return segs, nil
299
-
}
300
-
301
-
func (m *DBModel) GetLiveUsers() ([]Segment, error) {
302
-
var liveUsers []Segment
303
-
thirtySecondsAgo := aqtime.FromTime(time.Now().Add(-30 * time.Second)).Time()
304
-
305
-
err := m.DB.Model(&Segment{}).
306
-
Preload("Repo").
307
-
Where("start_time >= ?", thirtySecondsAgo).
308
-
Where("start_time = (SELECT MAX(start_time) FROM segments s2 WHERE s2.repo_did = segments.repo_did)").
309
-
Order("start_time DESC").
310
-
Find(&liveUsers).Error
311
-
312
-
if err != nil {
313
-
return nil, err
314
-
}
315
-
if liveUsers == nil {
316
-
return []Segment{}, nil
317
-
}
318
-
319
-
return liveUsers, nil
320
}
321
322
func (m *DBModel) GetSegment(id string) (*Segment, error) {
···
138
ID string `json:"id" gorm:"primaryKey"`
139
SigningKeyDID string `json:"signingKeyDID" gorm:"column:signing_key_did"`
140
SigningKey *SigningKey `json:"signingKey,omitempty" gorm:"foreignKey:DID;references:SigningKeyDID"`
141
+
StartTime time.Time `json:"startTime" gorm:"index:latest_segments,priority:2;index:start_time"`
142
+
RepoDID string `json:"repoDID" gorm:"index:latest_segments,priority:1;column:repo_did"`
143
Repo *Repo `json:"repo,omitempty" gorm:"foreignKey:DID;references:RepoDID"`
144
Title string `json:"title"`
145
Size int `json:"size" gorm:"column:size"`
···
243
244
err := m.DB.Table("segments").
245
Select("segments.*").
246
+
Where("start_time > ?", thirtySecondsAgo.UTC()).
0
0
0
0
0
0
247
Order("start_time DESC").
0
0
248
Find(&segments).Error
0
249
if err != nil {
250
return nil, err
251
}
···
253
return []Segment{}, nil
254
}
255
256
+
segmentMap := make(map[string]Segment)
257
for _, seg := range segments {
258
+
prev, ok := segmentMap[seg.RepoDID]
259
+
if !ok {
260
+
segmentMap[seg.RepoDID] = seg
261
+
} else {
262
+
if seg.StartTime.After(prev.StartTime) {
263
+
segmentMap[seg.RepoDID] = seg
264
+
}
265
}
266
}
267
268
+
filteredSegments := []Segment{}
269
+
for _, seg := range segmentMap {
270
+
filteredSegments = append(filteredSegments, seg)
271
+
}
272
+
273
return filteredSegments, nil
274
}
275
···
297
return nil, err
298
}
299
return segs, nil
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
300
}
301
302
func (m *DBModel) GetSegment(id string) (*Segment, error) {
+59
-48
pkg/model/segment_test.go
···
1
package model
2
3
-
// func TestSegmentCleaner(t *testing.T) {
4
-
// db, err := MakeDB(":memory:")
5
-
// require.NoError(t, err)
6
-
// // Create a model instance
7
-
// model := db.(*DBModel)
8
9
-
// // Create a repo for testing
10
-
// repo := &Repo{
11
-
// DID: "did:plc:test123",
12
-
// }
13
-
// err = model.DB.Create(repo).Error
14
-
// require.NoError(t, err)
15
16
-
// // Create 100 segments with timestamps 1 hour ago, each one second apart
17
-
// baseTime := time.Now().Add(-1 * time.Hour)
18
-
// for i := 0; i < 100; i++ {
19
-
// segment := &Segment{
20
-
// ID: fmt.Sprintf("segment-%d", i),
21
-
// RepoDID: repo.DID,
22
-
// StartTime: baseTime.Add(time.Duration(i) * time.Second),
23
-
// }
24
-
// err = model.DB.Create(segment).Error
25
-
// require.NoError(t, err)
26
-
// }
27
28
-
// // Verify we have 100 segments
29
-
// var count int64
30
-
// err = model.DB.Model(&Segment{}).Count(&count).Error
31
-
// require.NoError(t, err)
32
-
// require.Equal(t, int64(100), count)
0
33
34
-
// // Run the segment cleaner
35
-
// err = model.SegmentCleaner(context.Background())
36
-
// require.NoError(t, err)
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
37
38
-
// // Verify we now have only 10 segments
39
-
// err = model.DB.Model(&Segment{}).Count(&count).Error
40
-
// require.NoError(t, err)
41
-
// require.Equal(t, int64(10), count)
42
-
43
-
// // Verify the remaining segments are the most recent ones
44
-
// var segments []Segment
45
-
// err = model.DB.Model(&Segment{}).Order("start_time DESC").Find(&segments).Error
46
-
// require.NoError(t, err)
47
-
// require.Len(t, segments, 10)
48
-
49
-
// // The segments should be the last 10 we created (the most recent ones)
50
-
// for i, segment := range segments {
51
-
// expectedTime := baseTime.Add(time.Duration(99-i) * time.Second)
52
-
// // Allow a small tolerance for time comparison
53
-
// require.WithinDuration(t, expectedTime, segment.StartTime, time.Second)
54
-
// }
55
-
// }
···
1
package model
2
3
+
import (
4
+
"fmt"
5
+
"sync"
6
+
"testing"
7
+
"time"
8
9
+
"github.com/stretchr/testify/require"
10
+
"stream.place/streamplace/pkg/config"
11
+
)
0
0
0
12
13
+
func TestSegmentPerf(t *testing.T) {
14
+
config.DisableSQLLogging()
15
+
// dburl := filepath.Join(t.TempDir(), "test.db")
16
+
db, err := MakeDB(":memory:")
17
+
require.NoError(t, err)
18
+
// Create a model instance
19
+
model := db.(*DBModel)
20
+
t.Cleanup(func() {
21
+
// os.Remove(dburl)
22
+
})
0
23
24
+
// Create a repo for testing
25
+
repo := &Repo{
26
+
DID: "did:plc:test123",
27
+
}
28
+
err = model.DB.Create(repo).Error
29
+
require.NoError(t, err)
30
31
+
defer config.EnableSQLLogging()
32
+
// Create 250000 segments with timestamps 1 hour ago, each one second apart
33
+
wg := sync.WaitGroup{}
34
+
segCount := 250000
35
+
wg.Add(segCount)
36
+
baseTime := time.Now()
37
+
for i := 0; i < segCount; i++ {
38
+
segment := &Segment{
39
+
ID: fmt.Sprintf("segment-%d", i),
40
+
RepoDID: repo.DID,
41
+
StartTime: baseTime.Add(-time.Duration(i) * time.Second).UTC(),
42
+
}
43
+
go func() {
44
+
defer wg.Done()
45
+
err = model.DB.Create(segment).Error
46
+
require.NoError(t, err)
47
+
}()
48
+
}
49
+
wg.Wait()
50
51
+
startTime := time.Now()
52
+
wg = sync.WaitGroup{}
53
+
runs := 1000
54
+
wg.Add(runs)
55
+
for i := 0; i < runs; i++ {
56
+
go func() {
57
+
defer wg.Done()
58
+
_, err := model.MostRecentSegments()
59
+
require.NoError(t, err)
60
+
// require.Len(t, segments, 1)
61
+
}()
62
+
}
63
+
wg.Wait()
64
+
fmt.Printf("Time taken: %s\n", time.Since(startTime))
65
+
require.Less(t, time.Since(startTime), 10*time.Second)
66
+
}
0
0