Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v0.8.17 153 lines 3.7 kB view raw
1package c2patypes 2 3import ( 4 "errors" 5 "fmt" 6 "io" 7 8 "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace" 9) 10 11// type Stream interface { 12// // Read a stream of bytes from the stream 13// ReadStream(length uint64) ([]byte, error) 14// // Seek to a position in the stream 15// SeekStream(pos int64, mode uint64) (uint64, error) 16// // Write a stream of bytes to the stream 17// WriteStream(data []byte) (uint64, error) 18// } 19 20// pub enum SeekMode { 21// Start = 0, 22// End = 1, 23// Current = 2, 24// } 25 26const ( 27 SeekModeStart uint64 = 0 28 SeekModeEnd uint64 = 1 29 SeekModeCurrent uint64 = 2 30) 31 32func NewReader(rs io.ReadSeeker) *C2PAStreamReader { 33 return &C2PAStreamReader{ReadSeeker: rs} 34} 35 36func NewWriter(rws io.ReadWriteSeeker) *C2PAStreamWriter { 37 return &C2PAStreamWriter{ReadWriteSeeker: rws} 38} 39 40// Wrapped io.ReadSeeker for passing to Rust. Doesn't write. 41type C2PAStreamReader struct { 42 io.ReadSeeker 43} 44 45func (s *C2PAStreamReader) ReadStream(length uint64) ([]byte, error) { 46 return readStream(s.ReadSeeker, length) 47} 48 49func (s *C2PAStreamReader) SeekStream(pos int64, mode uint64) (uint64, error) { 50 return seekStream(s.ReadSeeker, pos, mode) 51} 52 53func (s *C2PAStreamReader) WriteStream(data []byte) (uint64, error) { 54 return 0, fmt.Errorf("Writing is not implemented for C2PAStreamReader") 55} 56 57// Wrapped io.Writer for passing to Rust. 58type C2PAStreamWriter struct { 59 io.ReadWriteSeeker 60} 61 62func (s *C2PAStreamWriter) ReadStream(length uint64) ([]byte, error) { 63 return readStream(s.ReadWriteSeeker, length) 64} 65 66func (s *C2PAStreamWriter) SeekStream(pos int64, mode uint64) (uint64, error) { 67 return seekStream(s.ReadWriteSeeker, pos, mode) 68} 69 70func (s *C2PAStreamWriter) WriteStream(data []byte) (uint64, error) { 71 return writeStream(s.ReadWriteSeeker, data) 72} 73 74func readStream(r io.ReadSeeker, length uint64) ([]byte, error) { 75 // fmt.Printf("read length=%d\n", length) 76 bs := make([]byte, length) 77 read, err := r.Read(bs) 78 if err != nil { 79 if errors.Is(err, io.EOF) { 80 if read == 0 { 81 // fmt.Printf("read EOF read=%d returning empty?", read) 82 return []byte{}, nil 83 } 84 // partial := bs[read:] 85 // return partial, nil 86 } 87 // fmt.Printf("io error=%s\n", err) 88 return []byte{}, err 89 } 90 if uint64(read) < length { 91 partial := bs[:read] 92 // fmt.Printf("read returning partial read=%d len=%d\n", read, len(partial)) 93 return partial, nil 94 } 95 // fmt.Printf("read returning full read=%d len=%d\n", read, len(bs)) 96 return bs, nil 97} 98 99func seekStream(r io.ReadSeeker, pos int64, mode uint64) (uint64, error) { 100 // fmt.Printf("seek pos=%d\n", pos) 101 var seekMode int 102 if mode == SeekModeCurrent { 103 seekMode = io.SeekCurrent 104 } else if mode == SeekModeStart { 105 seekMode = io.SeekStart 106 } else if mode == SeekModeEnd { 107 seekMode = io.SeekEnd 108 } else { 109 // fmt.Printf("seek mode unsupported mode=%d\n", mode) 110 return 0, fmt.Errorf("unknown seek mode: %d", mode) 111 } 112 newPos, err := r.Seek(pos, seekMode) 113 if err != nil { 114 return 0, err 115 } 116 return uint64(newPos), nil 117} 118 119func writeStream(w io.ReadWriteSeeker, data []byte) (uint64, error) { 120 wrote, err := w.Write(data) 121 if err != nil { 122 return uint64(wrote), err 123 } 124 return uint64(wrote), nil 125} 126 127type ManyStreams struct { 128 Streams []io.ReadSeeker 129 index int 130} 131 132func NewManyStreams() *ManyStreams { 133 return &ManyStreams{Streams: []io.ReadSeeker{}, index: 0} 134} 135 136func (m *ManyStreams) AddStream(stream io.ReadSeeker) { 137 m.Streams = append(m.Streams, stream) 138} 139 140func (m *ManyStreams) Next() *iroh_streamplace.Stream { 141 if m.index >= len(m.Streams) { 142 return nil 143 } 144 stream := m.Streams[m.index] 145 m.index++ 146 var r iroh_streamplace.Stream 147 if rws, ok := stream.(io.ReadWriteSeeker); ok { 148 r = NewWriter(rws) 149 } else { 150 r = NewReader(stream) 151 } 152 return &r 153}