Live video on the AT Protocol
1package c2patypes
2
3import (
4 "errors"
5 "fmt"
6 "io"
7
8 "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace"
9)
10
11// type Stream interface {
12// // Read a stream of bytes from the stream
13// ReadStream(length uint64) ([]byte, error)
14// // Seek to a position in the stream
15// SeekStream(pos int64, mode uint64) (uint64, error)
16// // Write a stream of bytes to the stream
17// WriteStream(data []byte) (uint64, error)
18// }
19
20// pub enum SeekMode {
21// Start = 0,
22// End = 1,
23// Current = 2,
24// }
25
26const (
27 SeekModeStart uint64 = 0
28 SeekModeEnd uint64 = 1
29 SeekModeCurrent uint64 = 2
30)
31
32func NewReader(rs io.ReadSeeker) *C2PAStreamReader {
33 return &C2PAStreamReader{ReadSeeker: rs}
34}
35
36func NewWriter(rws io.ReadWriteSeeker) *C2PAStreamWriter {
37 return &C2PAStreamWriter{ReadWriteSeeker: rws}
38}
39
40// Wrapped io.ReadSeeker for passing to Rust. Doesn't write.
41type C2PAStreamReader struct {
42 io.ReadSeeker
43}
44
45func (s *C2PAStreamReader) ReadStream(length uint64) ([]byte, error) {
46 return readStream(s.ReadSeeker, length)
47}
48
49func (s *C2PAStreamReader) SeekStream(pos int64, mode uint64) (uint64, error) {
50 return seekStream(s.ReadSeeker, pos, mode)
51}
52
53func (s *C2PAStreamReader) WriteStream(data []byte) (uint64, error) {
54 return 0, fmt.Errorf("Writing is not implemented for C2PAStreamReader")
55}
56
57// Wrapped io.Writer for passing to Rust.
58type C2PAStreamWriter struct {
59 io.ReadWriteSeeker
60}
61
62func (s *C2PAStreamWriter) ReadStream(length uint64) ([]byte, error) {
63 return readStream(s.ReadWriteSeeker, length)
64}
65
66func (s *C2PAStreamWriter) SeekStream(pos int64, mode uint64) (uint64, error) {
67 return seekStream(s.ReadWriteSeeker, pos, mode)
68}
69
70func (s *C2PAStreamWriter) WriteStream(data []byte) (uint64, error) {
71 return writeStream(s.ReadWriteSeeker, data)
72}
73
74func readStream(r io.ReadSeeker, length uint64) ([]byte, error) {
75 // fmt.Printf("read length=%d\n", length)
76 bs := make([]byte, length)
77 read, err := r.Read(bs)
78 if err != nil {
79 if errors.Is(err, io.EOF) {
80 if read == 0 {
81 // fmt.Printf("read EOF read=%d returning empty?", read)
82 return []byte{}, nil
83 }
84 // partial := bs[read:]
85 // return partial, nil
86 }
87 // fmt.Printf("io error=%s\n", err)
88 return []byte{}, err
89 }
90 if uint64(read) < length {
91 partial := bs[:read]
92 // fmt.Printf("read returning partial read=%d len=%d\n", read, len(partial))
93 return partial, nil
94 }
95 // fmt.Printf("read returning full read=%d len=%d\n", read, len(bs))
96 return bs, nil
97}
98
99func seekStream(r io.ReadSeeker, pos int64, mode uint64) (uint64, error) {
100 // fmt.Printf("seek pos=%d\n", pos)
101 var seekMode int
102 if mode == SeekModeCurrent {
103 seekMode = io.SeekCurrent
104 } else if mode == SeekModeStart {
105 seekMode = io.SeekStart
106 } else if mode == SeekModeEnd {
107 seekMode = io.SeekEnd
108 } else {
109 // fmt.Printf("seek mode unsupported mode=%d\n", mode)
110 return 0, fmt.Errorf("unknown seek mode: %d", mode)
111 }
112 newPos, err := r.Seek(pos, seekMode)
113 if err != nil {
114 return 0, err
115 }
116 return uint64(newPos), nil
117}
118
119func writeStream(w io.ReadWriteSeeker, data []byte) (uint64, error) {
120 wrote, err := w.Write(data)
121 if err != nil {
122 return uint64(wrote), err
123 }
124 return uint64(wrote), nil
125}
126
127type ManyStreams struct {
128 Streams []io.ReadSeeker
129 index int
130}
131
132func NewManyStreams() *ManyStreams {
133 return &ManyStreams{Streams: []io.ReadSeeker{}, index: 0}
134}
135
136func (m *ManyStreams) AddStream(stream io.ReadSeeker) {
137 m.Streams = append(m.Streams, stream)
138}
139
140func (m *ManyStreams) Next() *iroh_streamplace.Stream {
141 if m.index >= len(m.Streams) {
142 return nil
143 }
144 stream := m.Streams[m.index]
145 m.index++
146 var r iroh_streamplace.Stream
147 if rws, ok := stream.(io.ReadWriteSeeker); ok {
148 r = NewWriter(rws)
149 } else {
150 r = NewReader(stream)
151 }
152 return &r
153}