Live video on the AT Protocol
1package analytics
2
3import (
4 "context"
5 "fmt"
6
7 "google.golang.org/grpc"
8 "google.golang.org/grpc/credentials/insecure"
9 pb "stream.place/streamplace/pkg/analytics/pb"
10)
11
12type Client interface {
13 IngestEvents(ctx context.Context, events []*Event) error
14 GetRealtimeStats(ctx context.Context, req *RealtimeStatsRequest) (*RealtimeStatsResponse, error)
15 GetStreamerStats(ctx context.Context, req *StreamerStatsRequest) (*StreamerStatsResponse, error)
16 GetViewerHistory(ctx context.Context, req *ViewerHistoryRequest) (*ViewerHistoryResponse, error)
17 Close() error
18}
19
20type client struct {
21 conn *grpc.ClientConn
22 grpcClient pb.AnalyticsClient
23}
24
25func NewClient(ctx context.Context, endpoint string) (Client, error) {
26
27 conn, err := grpc.NewClient(endpoint,
28 grpc.WithTransportCredentials(insecure.NewCredentials()),
29 )
30 if err != nil {
31 return nil, fmt.Errorf("failed to connect to analytics service at %s: %w", endpoint, err)
32 }
33
34 return &client{
35 conn: conn,
36 grpcClient: pb.NewAnalyticsClient(conn),
37 }, nil
38}
39
40func (c *client) IngestEvents(ctx context.Context, events []*Event) error {
41 if len(events) == 0 {
42 return nil
43 }
44
45 pbEvents := make([]*pb.Event, len(events))
46 for i, e := range events {
47 pbEvents[i] = &pb.Event{
48 EventId: e.EventID,
49 EventType: e.EventType,
50 DeviceId: e.DeviceID,
51 Did: e.DID,
52 SessionId: e.SessionID,
53 TimestampMs: e.TimestampMs,
54 StreamerDid: e.StreamerDID,
55 StreamId: e.StreamID,
56 PropertiesJson: e.PropertiesJSON,
57 SchemaVersion: uint32(e.SchemaVersion),
58 ClientVersion: e.ClientVersion,
59 Platform: e.Platform,
60 }
61 }
62
63 req := &pb.IngestEventsRequest{
64 Events: pbEvents,
65 }
66
67 _, err := c.grpcClient.IngestEvents(ctx, req)
68 return err
69}
70
71func (c *client) GetRealtimeStats(ctx context.Context, req *RealtimeStatsRequest) (*RealtimeStatsResponse, error) {
72 pbReq := &pb.RealtimeStatsRequest{
73 WindowMinutes: req.WindowMinutes,
74 StreamerDid: req.StreamerDid,
75 }
76
77 resp, err := c.grpcClient.GetRealtimeStats(ctx, pbReq)
78 if err != nil {
79 return nil, err
80 }
81
82 return convertRealtimeStatsResponse(resp), nil
83}
84
85func (c *client) GetStreamerStats(ctx context.Context, req *StreamerStatsRequest) (*StreamerStatsResponse, error) {
86 pbReq := &pb.StreamerStatsRequest{
87 StreamerDid: req.StreamerDid,
88 StartTimeMs: req.StartTimeMs,
89 EndTimeMs: req.EndTimeMs,
90 }
91
92 resp, err := c.grpcClient.GetStreamerStats(ctx, pbReq)
93 if err != nil {
94 return nil, err
95 }
96
97 return convertStreamerStatsResponse(resp), nil
98}
99
100func (c *client) GetViewerHistory(ctx context.Context, req *ViewerHistoryRequest) (*ViewerHistoryResponse, error) {
101 pbReq := &pb.ViewerHistoryRequest{
102 Did: req.Did,
103 StartTimeMs: req.StartTimeMs,
104 EndTimeMs: req.EndTimeMs,
105 Limit: req.Limit,
106 }
107
108 resp, err := c.grpcClient.GetViewerHistory(ctx, pbReq)
109 if err != nil {
110 return nil, err
111 }
112
113 return convertViewerHistoryResponse(resp), nil
114}
115
116func (c *client) Close() error {
117 if c.conn != nil {
118 return c.conn.Close()
119 }
120 return nil
121}
122
123// NopClient is a no-op implementation of Client for when analytics is disabled
124type nopClient struct{}
125
126func (n *nopClient) IngestEvents(ctx context.Context, events []*Event) error {
127 return nil
128}
129
130func (n *nopClient) GetRealtimeStats(ctx context.Context, req *RealtimeStatsRequest) (*RealtimeStatsResponse, error) {
131 return &RealtimeStatsResponse{}, nil
132}
133
134func (n *nopClient) GetStreamerStats(ctx context.Context, req *StreamerStatsRequest) (*StreamerStatsResponse, error) {
135 return &StreamerStatsResponse{}, nil
136}
137
138func (n *nopClient) GetViewerHistory(ctx context.Context, req *ViewerHistoryRequest) (*ViewerHistoryResponse, error) {
139 return &ViewerHistoryResponse{}, nil
140}
141
142func (n *nopClient) Close() error {
143 return nil
144}