Live video on the AT Protocol
at natb/analytics 144 lines 3.8 kB view raw
1package analytics 2 3import ( 4 "context" 5 "fmt" 6 7 "google.golang.org/grpc" 8 "google.golang.org/grpc/credentials/insecure" 9 pb "stream.place/streamplace/pkg/analytics/pb" 10) 11 12type Client interface { 13 IngestEvents(ctx context.Context, events []*Event) error 14 GetRealtimeStats(ctx context.Context, req *RealtimeStatsRequest) (*RealtimeStatsResponse, error) 15 GetStreamerStats(ctx context.Context, req *StreamerStatsRequest) (*StreamerStatsResponse, error) 16 GetViewerHistory(ctx context.Context, req *ViewerHistoryRequest) (*ViewerHistoryResponse, error) 17 Close() error 18} 19 20type client struct { 21 conn *grpc.ClientConn 22 grpcClient pb.AnalyticsClient 23} 24 25func NewClient(ctx context.Context, endpoint string) (Client, error) { 26 27 conn, err := grpc.NewClient(endpoint, 28 grpc.WithTransportCredentials(insecure.NewCredentials()), 29 ) 30 if err != nil { 31 return nil, fmt.Errorf("failed to connect to analytics service at %s: %w", endpoint, err) 32 } 33 34 return &client{ 35 conn: conn, 36 grpcClient: pb.NewAnalyticsClient(conn), 37 }, nil 38} 39 40func (c *client) IngestEvents(ctx context.Context, events []*Event) error { 41 if len(events) == 0 { 42 return nil 43 } 44 45 pbEvents := make([]*pb.Event, len(events)) 46 for i, e := range events { 47 pbEvents[i] = &pb.Event{ 48 EventId: e.EventID, 49 EventType: e.EventType, 50 DeviceId: e.DeviceID, 51 Did: e.DID, 52 SessionId: e.SessionID, 53 TimestampMs: e.TimestampMs, 54 StreamerDid: e.StreamerDID, 55 StreamId: e.StreamID, 56 PropertiesJson: e.PropertiesJSON, 57 SchemaVersion: uint32(e.SchemaVersion), 58 ClientVersion: e.ClientVersion, 59 Platform: e.Platform, 60 } 61 } 62 63 req := &pb.IngestEventsRequest{ 64 Events: pbEvents, 65 } 66 67 _, err := c.grpcClient.IngestEvents(ctx, req) 68 return err 69} 70 71func (c *client) GetRealtimeStats(ctx context.Context, req *RealtimeStatsRequest) (*RealtimeStatsResponse, error) { 72 pbReq := &pb.RealtimeStatsRequest{ 73 WindowMinutes: req.WindowMinutes, 74 StreamerDid: req.StreamerDid, 75 } 76 77 resp, err := c.grpcClient.GetRealtimeStats(ctx, pbReq) 78 if err != nil { 79 return nil, err 80 } 81 82 return convertRealtimeStatsResponse(resp), nil 83} 84 85func (c *client) GetStreamerStats(ctx context.Context, req *StreamerStatsRequest) (*StreamerStatsResponse, error) { 86 pbReq := &pb.StreamerStatsRequest{ 87 StreamerDid: req.StreamerDid, 88 StartTimeMs: req.StartTimeMs, 89 EndTimeMs: req.EndTimeMs, 90 } 91 92 resp, err := c.grpcClient.GetStreamerStats(ctx, pbReq) 93 if err != nil { 94 return nil, err 95 } 96 97 return convertStreamerStatsResponse(resp), nil 98} 99 100func (c *client) GetViewerHistory(ctx context.Context, req *ViewerHistoryRequest) (*ViewerHistoryResponse, error) { 101 pbReq := &pb.ViewerHistoryRequest{ 102 Did: req.Did, 103 StartTimeMs: req.StartTimeMs, 104 EndTimeMs: req.EndTimeMs, 105 Limit: req.Limit, 106 } 107 108 resp, err := c.grpcClient.GetViewerHistory(ctx, pbReq) 109 if err != nil { 110 return nil, err 111 } 112 113 return convertViewerHistoryResponse(resp), nil 114} 115 116func (c *client) Close() error { 117 if c.conn != nil { 118 return c.conn.Close() 119 } 120 return nil 121} 122 123// NopClient is a no-op implementation of Client for when analytics is disabled 124type nopClient struct{} 125 126func (n *nopClient) IngestEvents(ctx context.Context, events []*Event) error { 127 return nil 128} 129 130func (n *nopClient) GetRealtimeStats(ctx context.Context, req *RealtimeStatsRequest) (*RealtimeStatsResponse, error) { 131 return &RealtimeStatsResponse{}, nil 132} 133 134func (n *nopClient) GetStreamerStats(ctx context.Context, req *StreamerStatsRequest) (*StreamerStatsResponse, error) { 135 return &StreamerStatsResponse{}, nil 136} 137 138func (n *nopClient) GetViewerHistory(ctx context.Context, req *ViewerHistoryRequest) (*ViewerHistoryResponse, error) { 139 return &ViewerHistoryResponse{}, nil 140} 141 142func (n *nopClient) Close() error { 143 return nil 144}