Live video on the AT Protocol
at eli/rtmp-push 153 lines 3.7 kB view raw
1package c2patypes 2 3import ( 4 "errors" 5 "fmt" 6 "io" 7 8 "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace" 9) 10 11// type Stream interface { 12// // Read a stream of bytes from the stream 13// ReadStream(length uint64) ([]byte, error) 14// // Seek to a position in the stream 15// SeekStream(pos int64, mode uint64) (uint64, error) 16// // Write a stream of bytes to the stream 17// WriteStream(data []byte) (uint64, error) 18// } 19 20// pub enum SeekMode { 21// Start = 0, 22// End = 1, 23// Current = 2, 24// } 25 26const ( 27 SeekModeStart uint64 = 0 28 SeekModeEnd uint64 = 1 29 SeekModeCurrent uint64 = 2 30) 31 32func NewReader(rs io.ReadSeeker) *C2PAStreamReader { 33 return &C2PAStreamReader{ReadSeeker: rs} 34} 35 36func NewWriter(rws io.ReadWriteSeeker) *C2PAStreamWriter { 37 return &C2PAStreamWriter{ReadWriteSeeker: rws} 38} 39 40// Wrapped io.ReadSeeker for passing to Rust. Doesn't write. 41type C2PAStreamReader struct { 42 io.ReadSeeker 43} 44 45func (s *C2PAStreamReader) ReadStream(length uint64) ([]byte, error) { 46 return readStream(s.ReadSeeker, length) 47} 48 49func (s *C2PAStreamReader) SeekStream(pos int64, mode uint64) (uint64, error) { 50 return seekStream(s.ReadSeeker, pos, mode) 51} 52 53func (s *C2PAStreamReader) WriteStream(data []byte) (uint64, error) { 54 return 0, fmt.Errorf("Writing is not implemented for C2PAStreamReader") 55} 56 57// Wrapped io.Writer for passing to Rust. 58type C2PAStreamWriter struct { 59 io.ReadWriteSeeker 60} 61 62func (s *C2PAStreamWriter) ReadStream(length uint64) ([]byte, error) { 63 return readStream(s.ReadWriteSeeker, length) 64} 65 66func (s *C2PAStreamWriter) SeekStream(pos int64, mode uint64) (uint64, error) { 67 return seekStream(s.ReadWriteSeeker, pos, mode) 68} 69 70func (s *C2PAStreamWriter) WriteStream(data []byte) (uint64, error) { 71 return writeStream(s.ReadWriteSeeker, data) 72} 73 74func readStream(r io.ReadSeeker, length uint64) ([]byte, error) { 75 // fmt.Printf("read length=%d\n", length) 76 bs := make([]byte, length) 77 read, err := r.Read(bs) 78 if err != nil { 79 if errors.Is(err, io.EOF) { 80 if read == 0 { 81 // fmt.Printf("read EOF read=%d returning empty?", read) 82 return []byte{}, nil 83 } 84 // partial := bs[read:] 85 // return partial, nil 86 } 87 // fmt.Printf("io error=%s\n", err) 88 return []byte{}, err 89 } 90 if uint64(read) < length { 91 partial := bs[:read] 92 // fmt.Printf("read returning partial read=%d len=%d\n", read, len(partial)) 93 return partial, nil 94 } 95 // fmt.Printf("read returning full read=%d len=%d\n", read, len(bs)) 96 return bs, nil 97} 98 99func seekStream(r io.ReadSeeker, pos int64, mode uint64) (uint64, error) { 100 // fmt.Printf("seek pos=%d\n", pos) 101 var seekMode int 102 if mode == SeekModeCurrent { 103 seekMode = io.SeekCurrent 104 } else if mode == SeekModeStart { 105 seekMode = io.SeekStart 106 } else if mode == SeekModeEnd { 107 seekMode = io.SeekEnd 108 } else { 109 // fmt.Printf("seek mode unsupported mode=%d\n", mode) 110 return 0, fmt.Errorf("unknown seek mode: %d", mode) 111 } 112 newPos, err := r.Seek(pos, seekMode) 113 if err != nil { 114 return 0, err 115 } 116 return uint64(newPos), nil 117} 118 119func writeStream(w io.ReadWriteSeeker, data []byte) (uint64, error) { 120 wrote, err := w.Write(data) 121 if err != nil { 122 return uint64(wrote), err 123 } 124 return uint64(wrote), nil 125} 126 127type ManyStreams struct { 128 Streams []io.ReadSeeker 129 index int 130} 131 132func NewManyStreams() *ManyStreams { 133 return &ManyStreams{Streams: []io.ReadSeeker{}, index: 0} 134} 135 136func (m *ManyStreams) AddStream(stream io.ReadSeeker) { 137 m.Streams = append(m.Streams, stream) 138} 139 140func (m *ManyStreams) Next() *iroh_streamplace.Stream { 141 if m.index >= len(m.Streams) { 142 return nil 143 } 144 stream := m.Streams[m.index] 145 m.index++ 146 var r iroh_streamplace.Stream 147 if rws, ok := stream.(io.ReadWriteSeeker); ok { 148 r = NewWriter(rws) 149 } else { 150 r = NewReader(stream) 151 } 152 return &r 153}