A modern Music Player Daemon based on Rockbox open source high quality audio player
libadwaita audio rust zig deno mpris rockbox mpd
at master 991 lines 26 kB view raw
1/*************************************************************************** 2 * __________ __ ___. 3 * Open \______ \ ____ ____ | | _\_ |__ _______ ___ 4 * Source | _// _ \_/ ___\| |/ /| __ \ / _ \ \/ / 5 * Jukebox | | ( <_> ) \___| < | \_\ ( <_> > < < 6 * Firmware |____|_ /\____/ \___ >__|_ \|___ /\____/__/\_ \ 7 * \/ \/ \/ \/ \/ 8 * $Id$ 9 * 10 * mpegplayer buffering routines 11 * 12 * Copyright (c) 2007 Michael Sevakis 13 * 14 * This program is free software; you can redistribute it and/or 15 * modify it under the terms of the GNU General Public License 16 * as published by the Free Software Foundation; either version 2 17 * of the License, or (at your option) any later version. 18 * 19 * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY 20 * KIND, either express or implied. 21 * 22 ****************************************************************************/ 23#include "plugin.h" 24#include "mpegplayer.h" 25#include <system.h> 26 27static struct mutex disk_buf_mtx SHAREDBSS_ATTR; 28static struct event_queue disk_buf_queue SHAREDBSS_ATTR; 29static struct queue_sender_list disk_buf_queue_send SHAREDBSS_ATTR; 30static uint32_t disk_buf_stack[DEFAULT_STACK_SIZE*2/sizeof(uint32_t)]; 31 32struct disk_buf disk_buf SHAREDBSS_ATTR; 33static void *nf_list[MPEGPLAYER_MAX_STREAMS+1]; 34 35static inline void disk_buf_lock(void) 36{ 37 rb->mutex_lock(&disk_buf_mtx); 38} 39 40static inline void disk_buf_unlock(void) 41{ 42 rb->mutex_unlock(&disk_buf_mtx); 43} 44 45static inline void disk_buf_on_clear_data_notify(struct stream_hdr *sh) 46{ 47 DEBUGF("DISK_BUF_CLEAR_DATA_NOTIFY: 0x%02X (cleared)\n", 48 STR_FROM_HDR(sh)->id); 49 list_remove_item(nf_list, sh); 50} 51 52inline bool disk_buf_is_data_ready(struct stream_hdr *sh, 53 ssize_t margin) 54{ 55 /* Data window available? */ 56 off_t right = sh->win_right; 57 58 /* Margins past end-of-file can still return true */ 59 if (right > disk_buf.filesize - margin) 60 right = disk_buf.filesize - margin; 61 62 return sh->win_left >= disk_buf.win_left && 63 right + margin <= disk_buf.win_right; 64} 65 66void dbuf_l2_init(struct dbuf_l2_cache *l2_p) 67{ 68 l2_p->addr = OFF_T_MAX; /* Mark as invalid */ 69} 70 71static int disk_buf_on_data_notify(struct stream_hdr *sh) 72{ 73 DEBUGF("DISK_BUF_DATA_NOTIFY: 0x%02X ", STR_FROM_HDR(sh)->id); 74 75 if (sh->win_left <= sh->win_right) 76 { 77 /* Check if the data is already ready already */ 78 if (disk_buf_is_data_ready(sh, 0)) 79 { 80 /* It was - don't register */ 81 DEBUGF("(was ready)\n" 82 " swl:%jd swr:%jd\n" 83 " dwl:%jd dwr:%jd\n", 84 (intmax_t) sh->win_left, (intmax_t) sh->win_right, 85 (intmax_t) disk_buf.win_left, (intmax_t) disk_buf.win_right); 86 /* Be sure it's not listed though if multiple requests were made */ 87 list_remove_item(nf_list, sh); 88 return DISK_BUF_NOTIFY_OK; 89 } 90 91 switch (disk_buf.state) 92 { 93 case TSTATE_DATA: 94 case TSTATE_BUFFERING: 95 case TSTATE_INIT: 96 disk_buf.state = TSTATE_BUFFERING; 97 list_add_item(nf_list, sh); 98 DEBUGF("(registered)\n" 99 " swl:%jd swr:%jd\n" 100 " dwl:%jd dwr:%jd\n", 101 (intmax_t) sh->win_left, (intmax_t) sh->win_right, 102 (intmax_t) disk_buf.win_left, (intmax_t) disk_buf.win_right); 103 return DISK_BUF_NOTIFY_REGISTERED; 104 } 105 } 106 107 DEBUGF("(error)\n"); 108 return DISK_BUF_NOTIFY_ERROR; 109} 110 111static bool check_data_notifies_callback(struct stream_hdr *sh, void *data) 112{ 113 if (disk_buf_is_data_ready(sh, 0)) 114 { 115 /* Remove from list then post notification - post because send 116 * could result in a wait for each thread to finish resulting 117 * in deadlock */ 118 list_remove_item(nf_list, sh); 119 str_post_msg(STR_FROM_HDR(sh), DISK_BUF_DATA_NOTIFY, 0); 120 DEBUGF("DISK_BUF_DATA_NOTIFY: 0x%02X (notified)\n", 121 STR_FROM_HDR(sh)->id); 122 } 123 124 return true; 125 (void)data; 126} 127 128/* Check registered streams and notify them if their data is available */ 129static inline void check_data_notifies(void) 130{ 131 list_enum_items(nf_list, 132 (list_enum_callback_t)check_data_notifies_callback, 133 NULL); 134} 135 136/* Clear all registered notifications - do not post them */ 137static inline void clear_data_notifies(void) 138{ 139 list_clear_all(nf_list); 140} 141 142/* Background buffering when streaming */ 143static inline void disk_buf_buffer(void) 144{ 145 struct stream_window sw; 146 147 switch (disk_buf.state) 148 { 149 default: 150 { 151 size_t wm; 152 uint32_t time; 153 154 /* Get remaining minimum data based upon the stream closest to the 155 * right edge of the window */ 156 if (!stream_get_window(&sw)) 157 break; 158 159 time = stream_get_ticks(NULL); 160 wm = muldiv_uint32(5*CLOCK_RATE, sw.right - disk_buf.pos_last, 161 time - disk_buf.time_last); 162 wm = MIN(wm, (size_t)disk_buf.size); 163 wm = MAX(wm, DISK_BUF_LOW_WATERMARK); 164 165 disk_buf.time_last = time; 166 disk_buf.pos_last = sw.right; 167 168 /* Fast attack, slow decay */ 169 disk_buf.low_wm = (wm > (size_t)disk_buf.low_wm) ? 170 wm : AVERAGE(disk_buf.low_wm, wm, 16); 171 172#if 0 173 rb->splashf(0, "*%10ld %10ld", disk_buf.low_wm, 174 disk_buf.win_right - sw.right); 175#endif 176 177 if (disk_buf.win_right - sw.right > disk_buf.low_wm) 178 break; 179 180 disk_buf.state = TSTATE_BUFFERING; 181 } /* default: */ 182 183 /* Fall-through */ 184 case TSTATE_BUFFERING: 185 { 186 ssize_t len, n; 187 uint32_t tag, *tag_p; 188 189 /* Limit buffering up to the stream with the least progress */ 190 if (!stream_get_window(&sw)) 191 { 192 disk_buf.state = TSTATE_DATA; 193 rb->storage_sleep(); 194 break; 195 } 196 197 /* Wrap pointer */ 198 if (disk_buf.tail >= disk_buf.end) 199 disk_buf.tail = disk_buf.start; 200 201 len = disk_buf.size - disk_buf.win_right + sw.left; 202 203 if (len < DISK_BUF_PAGE_SIZE) 204 { 205 /* Free space is less than one page */ 206 disk_buf.state = TSTATE_DATA; 207 disk_buf.low_wm = DISK_BUF_LOW_WATERMARK; 208 rb->storage_sleep(); 209 break; 210 } 211 212 len = disk_buf.tail - disk_buf.start; 213 tag = MAP_OFFSET_TO_TAG(disk_buf.win_right); 214 tag_p = &disk_buf.cache[len >> DISK_BUF_PAGE_SHIFT]; 215 216 if (*tag_p != tag) 217 { 218 if (disk_buf.need_seek) 219 { 220 rb->lseek(disk_buf.in_file, disk_buf.win_right, SEEK_SET); 221 disk_buf.need_seek = false; 222 } 223 224 n = rb->read(disk_buf.in_file, disk_buf.tail, DISK_BUF_PAGE_SIZE); 225 226 if (n <= 0) 227 { 228 /* Error or end of stream */ 229 disk_buf.state = TSTATE_EOS; 230 rb->storage_sleep(); 231 break; 232 } 233 234 if (len < DISK_GUARDBUF_SIZE) 235 { 236 /* Autoguard guard-o-rama - maintain guardbuffer coherency */ 237 rb->memcpy(disk_buf.end + len, disk_buf.tail, 238 MIN(DISK_GUARDBUF_SIZE - len, n)); 239 } 240 241 /* Update the cache entry for this page */ 242 *tag_p = tag; 243 } 244 else 245 { 246 /* Skipping a read */ 247 n = MIN(DISK_BUF_PAGE_SIZE, 248 disk_buf.filesize - disk_buf.win_right); 249 disk_buf.need_seek = true; 250 } 251 252 disk_buf.tail += DISK_BUF_PAGE_SIZE; 253 254 /* Keep left edge moving forward */ 255 256 /* Advance right edge in temp variable first, then move 257 * left edge if overflow would occur. This avoids a stream 258 * thinking its data might be available when it actually 259 * may not end up that way after a slide of the window. */ 260 len = disk_buf.win_right + n; 261 262 if (len - disk_buf.win_left > disk_buf.size) 263 disk_buf.win_left += n; 264 265 disk_buf.win_right = len; 266 267 /* Continue buffering until filled or file end */ 268 rb->yield(); 269 } /* TSTATE_BUFFERING: */ 270 271 case TSTATE_EOS: 272 break; 273 } /* end switch */ 274} 275 276static void disk_buf_on_reset(ssize_t pos) 277{ 278 int page; 279 uint32_t tag; 280 off_t anchor; 281 282 disk_buf.state = TSTATE_INIT; 283 disk_buf.status = STREAM_STOPPED; 284 clear_data_notifies(); 285 286 if (pos >= disk_buf.filesize) 287 { 288 /* Anchor on page immediately following the one containing final 289 * data */ 290 anchor = disk_buf.file_pages * DISK_BUF_PAGE_SIZE; 291 disk_buf.win_left = disk_buf.filesize; 292 } 293 else 294 { 295 anchor = pos & ~DISK_BUF_PAGE_MASK; 296 disk_buf.win_left = anchor; 297 } 298 299 /* Collect all valid data already buffered that is contiguous with the 300 * current position - probe to left, then to right */ 301 if (anchor > 0) 302 { 303 page = MAP_OFFSET_TO_PAGE(anchor); 304 tag = MAP_OFFSET_TO_TAG(anchor); 305 306 do 307 { 308 if (--tag, --page < 0) 309 page = disk_buf.pgcount - 1; 310 311 if (disk_buf.cache[page] != tag) 312 break; 313 314 disk_buf.win_left = tag << DISK_BUF_PAGE_SHIFT; 315 } 316 while (tag > 0); 317 } 318 319 if (anchor < disk_buf.filesize) 320 { 321 page = MAP_OFFSET_TO_PAGE(anchor); 322 tag = MAP_OFFSET_TO_TAG(anchor); 323 324 do 325 { 326 if (disk_buf.cache[page] != tag) 327 break; 328 329 if (++tag, ++page >= disk_buf.pgcount) 330 page = 0; 331 332 anchor += DISK_BUF_PAGE_SIZE; 333 } 334 while (anchor < disk_buf.filesize); 335 } 336 337 if (anchor >= disk_buf.filesize) 338 { 339 disk_buf.win_right = disk_buf.filesize; 340 disk_buf.state = TSTATE_EOS; 341 } 342 else 343 { 344 disk_buf.win_right = anchor; 345 } 346 347 disk_buf.tail = disk_buf.start + MAP_OFFSET_TO_BUFFER(anchor); 348 349 DEBUGF("disk buf reset\n" 350 " dwl:%jd dwr:%jd\n", 351 (intmax_t) disk_buf.win_left, (intmax_t) disk_buf.win_right); 352 353 /* Next read position is at right edge */ 354 rb->lseek(disk_buf.in_file, disk_buf.win_right, SEEK_SET); 355 disk_buf.need_seek = false; 356 357 disk_buf_reply_msg(disk_buf.win_right - disk_buf.win_left); 358} 359 360static void disk_buf_on_stop(void) 361{ 362 bool was_buffering = disk_buf.state == TSTATE_BUFFERING; 363 364 disk_buf.state = TSTATE_EOS; 365 disk_buf.status = STREAM_STOPPED; 366 clear_data_notifies(); 367 368 disk_buf_reply_msg(was_buffering); 369} 370 371static void disk_buf_on_play_pause(bool play, bool forcefill) 372{ 373 struct stream_window sw; 374 375 if (disk_buf.state != TSTATE_EOS) 376 { 377 if (forcefill) 378 { 379 /* Force buffer filling to top */ 380 disk_buf.state = TSTATE_BUFFERING; 381 } 382 else if (disk_buf.state != TSTATE_BUFFERING) 383 { 384 /* If not filling already, simply monitor */ 385 disk_buf.state = TSTATE_DATA; 386 } 387 } 388 /* else end of stream - no buffering to do */ 389 390 disk_buf.pos_last = stream_get_window(&sw) ? sw.right : 0; 391 disk_buf.time_last = stream_get_ticks(NULL); 392 393 disk_buf.status = play ? STREAM_PLAYING : STREAM_PAUSED; 394} 395 396static int disk_buf_on_load_range(struct dbuf_range *rng) 397{ 398 uint32_t tag = rng->tag_start; 399 uint32_t tag_end = rng->tag_end; 400 int page = rng->pg_start; 401 402 /* Check if a seek is required */ 403 bool need_seek = rb->lseek(disk_buf.in_file, 0, SEEK_CUR) 404 != (off_t)(tag << DISK_BUF_PAGE_SHIFT); 405 406 do 407 { 408 uint32_t *tag_p = &disk_buf.cache[page]; 409 410 if (*tag_p != tag) 411 { 412 /* Page not cached - load it */ 413 ssize_t o, n; 414 415 if (need_seek) 416 { 417 rb->lseek(disk_buf.in_file, tag << DISK_BUF_PAGE_SHIFT, 418 SEEK_SET); 419 need_seek = false; 420 } 421 422 o = page << DISK_BUF_PAGE_SHIFT; 423 n = rb->read(disk_buf.in_file, disk_buf.start + o, 424 DISK_BUF_PAGE_SIZE); 425 426 if (n < 0) 427 { 428 /* Read error */ 429 return DISK_BUF_NOTIFY_ERROR; 430 } 431 432 if (n == 0) 433 { 434 /* End of file */ 435 break; 436 } 437 438 if (o < DISK_GUARDBUF_SIZE) 439 { 440 /* Autoguard guard-o-rama - maintain guardbuffer coherency */ 441 rb->memcpy(disk_buf.end + o, disk_buf.start + o, 442 MIN(DISK_GUARDBUF_SIZE - o, n)); 443 } 444 445 /* Update the cache entry */ 446 *tag_p = tag; 447 } 448 else 449 { 450 /* Skipping a disk read - must seek on next one */ 451 need_seek = true; 452 } 453 454 if (++page >= disk_buf.pgcount) 455 page = 0; 456 } 457 while (++tag <= tag_end); 458 459 return DISK_BUF_NOTIFY_OK; 460} 461 462static void disk_buf_thread(void) 463{ 464 struct queue_event ev; 465 466 disk_buf.state = TSTATE_EOS; 467 disk_buf.status = STREAM_STOPPED; 468 469 while (1) 470 { 471 if (disk_buf.state != TSTATE_EOS) 472 { 473 /* Poll buffer status and messages */ 474 rb->queue_wait_w_tmo(disk_buf.q, &ev, 475 disk_buf.state == TSTATE_BUFFERING ? 476 0 : HZ/5); 477 } 478 else 479 { 480 /* Sit idle and wait for commands */ 481 rb->queue_wait(disk_buf.q, &ev); 482 } 483 484 switch (ev.id) 485 { 486 case SYS_TIMEOUT: 487 if (disk_buf.state == TSTATE_EOS) 488 break; 489 490 disk_buf_buffer(); 491 492 /* Check for any due notifications if any are pending */ 493 if (*nf_list != NULL) 494 check_data_notifies(); 495 496 /* Still more data left? */ 497 if (disk_buf.state != TSTATE_EOS) 498 continue; 499 500 /* Nope - end of stream */ 501 break; 502 503 case DISK_BUF_CACHE_RANGE: 504 disk_buf_reply_msg(disk_buf_on_load_range( 505 (struct dbuf_range *)ev.data)); 506 break; 507 508 case STREAM_RESET: 509 disk_buf_on_reset(ev.data); 510 break; 511 512 case STREAM_STOP: 513 disk_buf_on_stop(); 514 break; 515 516 case STREAM_PAUSE: 517 case STREAM_PLAY: 518 disk_buf_on_play_pause(ev.id == STREAM_PLAY, ev.data != 0); 519 disk_buf_reply_msg(1); 520 break; 521 522 case STREAM_QUIT: 523 disk_buf.state = TSTATE_EOS; 524 return; 525 526 case DISK_BUF_DATA_NOTIFY: 527 disk_buf_reply_msg(disk_buf_on_data_notify( 528 (struct stream_hdr *)ev.data)); 529 break; 530 531 case DISK_BUF_CLEAR_DATA_NOTIFY: 532 disk_buf_on_clear_data_notify((struct stream_hdr *)ev.data); 533 disk_buf_reply_msg(1); 534 break; 535 } 536 } 537} 538 539/* Caches some data from the current file */ 540static ssize_t disk_buf_probe(off_t start, size_t length, void **p) 541{ 542 off_t end; 543 uint32_t tag, tag_end; 544 int page; 545 546 /* Can't read past end of file */ 547 if (length > (size_t)(disk_buf.filesize - start)) 548 { 549 length = disk_buf.filesize - start; 550 } 551 552 /* Can't cache more than the whole buffer size */ 553 if (length > (size_t)disk_buf.size) 554 { 555 length = disk_buf.size; 556 } 557 /* Zero-length probes permitted */ 558 559 end = start + length; 560 561 /* Prepare the range probe */ 562 tag = MAP_OFFSET_TO_TAG(start); 563 tag_end = MAP_OFFSET_TO_TAG(end); 564 page = MAP_OFFSET_TO_PAGE(start); 565 566 /* If the end is on a page boundary, check one less or an extra 567 * one will be probed */ 568 if (tag_end > tag && (end & DISK_BUF_PAGE_MASK) == 0) 569 { 570 tag_end--; 571 } 572 573 if (p != NULL) 574 { 575 *p = disk_buf.start + (page << DISK_BUF_PAGE_SHIFT) 576 + (start & DISK_BUF_PAGE_MASK); 577 } 578 579 /* Obtain initial load point. If all data was cached, no message is sent 580 * otherwise begin on the first page that is not cached. Since we have to 581 * send the message anyway, the buffering thread will determine what else 582 * requires loading on its end in order to cache the specified range. */ 583 do 584 { 585 if (disk_buf.cache[page] != tag) 586 { 587 static struct dbuf_range rng IBSS_ATTR; 588 intptr_t result; 589 590 DEBUGF("disk_buf: cache miss\n"); 591 rng.tag_start = tag; 592 rng.tag_end = tag_end; 593 rng.pg_start = page; 594 595 result = rb->queue_send(disk_buf.q, DISK_BUF_CACHE_RANGE, 596 (intptr_t)&rng); 597 598 return result == DISK_BUF_NOTIFY_OK ? (ssize_t)length : -1; 599 } 600 601 if (++page >= disk_buf.pgcount) 602 page = 0; 603 } 604 while (++tag <= tag_end); 605 606 return length; 607} 608 609/* Attempt to get a pointer to size bytes on the buffer. Returns real amount of 610 * data available as well as the size of non-wrapped data after *p. */ 611ssize_t _disk_buf_getbuffer(size_t size, void **pp, void **pwrap, 612 size_t *sizewrap) 613{ 614 disk_buf_lock(); 615 616 size = disk_buf_probe(disk_buf.offset, size, pp); 617 618 if (size != (size_t)-1 && pwrap && sizewrap) 619 { 620 uint8_t *p = (uint8_t *)*pp; 621 622 if (p + size > disk_buf.end + DISK_GUARDBUF_SIZE) 623 { 624 /* Return pointer to wraparound and the size of same */ 625 size_t nowrap = (disk_buf.end + DISK_GUARDBUF_SIZE) - p; 626 *pwrap = disk_buf.start + DISK_GUARDBUF_SIZE; 627 *sizewrap = size - nowrap; 628 } 629 else 630 { 631 *pwrap = NULL; 632 *sizewrap = 0; 633 } 634 } 635 636 disk_buf_unlock(); 637 638 return size; 639} 640 641ssize_t _disk_buf_getbuffer_l2(struct dbuf_l2_cache *l2, 642 size_t size, void **pp) 643{ 644 off_t offs; 645 off_t l2_addr; 646 size_t l2_size; 647 void *l2_p; 648 649 if (l2 == NULL) 650 { 651 /* Shouldn't have to check this normally */ 652 DEBUGF("disk_buf_getbuffer_l2: l2 = NULL!\n"); 653 } 654 655 if (size > DISK_BUF_L2_CACHE_SIZE) 656 { 657 /* Asking for too much; just go through L1 */ 658 return disk_buf_getbuffer(size, pp, NULL, NULL); 659 } 660 661 offs = disk_buf.offset; /* Other calls keep this within bounds */ 662 l2_addr = l2->addr; 663 664 if (offs >= l2_addr && offs < l2_addr + DISK_BUF_L2_CACHE_SIZE) 665 { 666 /* Data is in the local buffer */ 667 offs &= DISK_BUF_L2_CACHE_MASK; 668 669 *pp = l2->data + offs; 670 if (offs + size > l2->size) 671 size = l2->size - offs; /* Keep size within file limits */ 672 673 return size; 674 } 675 676 /* Have to probe main buffer */ 677 l2_addr = offs & ~DISK_BUF_L2_CACHE_MASK; 678 l2_size = DISK_BUF_L2_CACHE_SIZE*2; /* 2nd half is a guard buffer */ 679 680 disk_buf_lock(); 681 682 l2_size = disk_buf_probe(l2_addr, l2_size, &l2_p); 683 684 if (l2_size != (size_t)-1) 685 { 686 rb->memcpy(l2->data, l2_p, l2_size); 687 688 l2->addr = l2_addr; 689 l2->size = l2_size; 690 offs -= l2_addr; 691 692 *pp = l2->data + offs; 693 if (offs + size > l2->size) 694 size = l2->size - offs; /* Keep size within file limits */ 695 } 696 else 697 { 698 size = -1; 699 } 700 701 disk_buf_unlock(); 702 703 return size; 704} 705 706 707/* Read size bytes of data into a buffer - advances the buffer pointer 708 * and returns the real size read. */ 709ssize_t disk_buf_read(void *buffer, size_t size) 710{ 711 uint8_t *p; 712 713 disk_buf_lock(); 714 715 size = disk_buf_probe(disk_buf.offset, size, PUN_PTR(void **, &p)); 716 717 if (size != (size_t)-1) 718 { 719 if (p + size > disk_buf.end + DISK_GUARDBUF_SIZE) 720 { 721 /* Read wraps */ 722 size_t nowrap = (disk_buf.end + DISK_GUARDBUF_SIZE) - p; 723 rb->memcpy(buffer, p, nowrap); 724 rb->memcpy(buffer + nowrap, disk_buf.start + DISK_GUARDBUF_SIZE, 725 size - nowrap); 726 } 727 else 728 { 729 /* Read wasn't wrapped or guardbuffer holds it */ 730 rb->memcpy(buffer, p, size); 731 } 732 733 disk_buf.offset += size; 734 } 735 736 disk_buf_unlock(); 737 738 return size; 739} 740 741ssize_t disk_buf_lseek(off_t offset, int whence) 742{ 743 disk_buf_lock(); 744 745 /* The offset returned is the result of the current thread's action and 746 * may be invalidated so a local result is returned and not the value 747 * of disk_buf.offset directly */ 748 switch (whence) 749 { 750 case SEEK_SET: 751 /* offset is just the offset */ 752 break; 753 case SEEK_CUR: 754 offset += disk_buf.offset; 755 break; 756 case SEEK_END: 757 offset = disk_buf.filesize + offset; 758 break; 759 default: 760 disk_buf_unlock(); 761 return -2; /* Invalid request */ 762 } 763 764 if (offset < 0 || offset > disk_buf.filesize) 765 { 766 offset = -3; 767 } 768 else 769 { 770 disk_buf.offset = offset; 771 } 772 773 disk_buf_unlock(); 774 775 return offset; 776} 777 778/* Prepare the buffer to enter the streaming state. Evaluates the available 779 * streaming window. */ 780ssize_t disk_buf_prepare_streaming(off_t pos, size_t len) 781{ 782 disk_buf_lock(); 783 784 if (pos < 0) 785 pos = 0; 786 else if (pos > disk_buf.filesize) 787 pos = disk_buf.filesize; 788 789 DEBUGF("prepare streaming:\n pos:%jd len:%lu\n", 790 (intmax_t) pos, (unsigned long)len); 791 792 pos = disk_buf_lseek(pos, SEEK_SET); 793 len = disk_buf_probe(pos, len, NULL); 794 795 DEBUGF(" probe done: pos:%jd len:%lu\n", 796 (intmax_t) pos, (unsigned long)len); 797 798 len = disk_buf_send_msg(STREAM_RESET, pos); 799 800 disk_buf_unlock(); 801 802 return len; 803} 804 805/* Set the streaming window to an arbitrary position within the file. Makes no 806 * probes to validate data. Use after calling another function to cause data 807 * to be cached and correct values are known. */ 808ssize_t disk_buf_set_streaming_window(off_t left, off_t right) 809{ 810 ssize_t len; 811 812 disk_buf_lock(); 813 814 if (left < 0) 815 left = 0; 816 else if (left > disk_buf.filesize) 817 left = disk_buf.filesize; 818 819 if (left > right) 820 right = left; 821 822 if (right > disk_buf.filesize) 823 right = disk_buf.filesize; 824 825 disk_buf.win_left = left; 826 disk_buf.win_right = right; 827 disk_buf.tail = disk_buf.start + ((right + DISK_BUF_PAGE_SIZE-1) & 828 ~DISK_BUF_PAGE_MASK) % disk_buf.size; 829 830 len = disk_buf.win_right - disk_buf.win_left; 831 832 disk_buf_unlock(); 833 834 return len; 835} 836 837void * disk_buf_offset2ptr(off_t offset) 838{ 839 if (offset < 0) 840 offset = 0; 841 else if (offset > disk_buf.filesize) 842 offset = disk_buf.filesize; 843 844 return disk_buf.start + (offset % disk_buf.size); 845} 846 847void disk_buf_close(void) 848{ 849 disk_buf_lock(); 850 851 if (disk_buf.in_file >= 0) 852 { 853 rb->close(disk_buf.in_file); 854 disk_buf.in_file = -1; 855 856 /* Invalidate entire cache */ 857 rb->memset(disk_buf.cache, 0xff, 858 disk_buf.pgcount*sizeof (*disk_buf.cache)); 859 disk_buf.file_pages = 0; 860 disk_buf.filesize = 0; 861 disk_buf.offset = 0; 862 } 863 864 disk_buf_unlock(); 865} 866 867int disk_buf_open(const char *filename) 868{ 869 int fd; 870 871 disk_buf_lock(); 872 873 disk_buf_close(); 874 875 fd = rb->open(filename, O_RDONLY); 876 877 if (fd >= 0) 878 { 879 ssize_t filesize = rb->filesize(fd); 880 881 if (filesize <= 0) 882 { 883 rb->close(disk_buf.in_file); 884 } 885 else 886 { 887 disk_buf.filesize = filesize; 888 /* Number of file pages rounded up toward +inf */ 889 disk_buf.file_pages = ((size_t)filesize + DISK_BUF_PAGE_SIZE-1) 890 / DISK_BUF_PAGE_SIZE; 891 disk_buf.in_file = fd; 892 } 893 } 894 895 disk_buf_unlock(); 896 897 return fd; 898} 899 900intptr_t disk_buf_send_msg(long id, intptr_t data) 901{ 902 return rb->queue_send(disk_buf.q, id, data); 903} 904 905void disk_buf_post_msg(long id, intptr_t data) 906{ 907 rb->queue_post(disk_buf.q, id, data); 908} 909 910void disk_buf_reply_msg(intptr_t retval) 911{ 912 rb->queue_reply(disk_buf.q, retval); 913} 914 915bool disk_buf_init(void) 916{ 917 disk_buf.thread = 0; 918 919 rb->mutex_init(&disk_buf_mtx); 920 921 disk_buf.q = &disk_buf_queue; 922 rb->queue_init(disk_buf.q, false); 923 924 disk_buf.state = TSTATE_EOS; 925 disk_buf.status = STREAM_STOPPED; 926 927 disk_buf.in_file = -1; 928 disk_buf.filesize = 0; 929 disk_buf.win_left = 0; 930 disk_buf.win_right = 0; 931 disk_buf.time_last = 0; 932 disk_buf.pos_last = 0; 933 disk_buf.low_wm = DISK_BUF_LOW_WATERMARK; 934 935 disk_buf.start = mpeg_malloc_all((size_t*)&disk_buf.size, MPEG_ALLOC_DISKBUF); 936 if (disk_buf.start == NULL) 937 return false; 938 939#if NUM_CORES > 1 940 CACHEALIGN_BUFFER(disk_buf.start, disk_buf.size); 941 disk_buf.start = UNCACHED_ADDR(disk_buf.start); 942#endif 943 disk_buf.size -= DISK_GUARDBUF_SIZE; 944 disk_buf.pgcount = disk_buf.size / DISK_BUF_PAGE_SIZE; 945 946 /* Fit it as tightly as possible */ 947 while (disk_buf.pgcount*(sizeof (*disk_buf.cache) + DISK_BUF_PAGE_SIZE) 948 > (size_t)disk_buf.size) 949 { 950 disk_buf.pgcount--; 951 } 952 953 disk_buf.cache = (typeof (disk_buf.cache))disk_buf.start; 954 disk_buf.start += sizeof (*disk_buf.cache)*disk_buf.pgcount; 955 disk_buf.size = disk_buf.pgcount*DISK_BUF_PAGE_SIZE; 956 disk_buf.end = disk_buf.start + disk_buf.size; 957 disk_buf.tail = disk_buf.start; 958 959 DEBUGF("disk_buf info:\n" 960 " page count: %d\n" 961 " size: %ld\n", 962 disk_buf.pgcount, (long)disk_buf.size); 963 964 rb->memset(disk_buf.cache, 0xff, 965 disk_buf.pgcount*sizeof (*disk_buf.cache)); 966 967 disk_buf.thread = rb->create_thread( 968 disk_buf_thread, disk_buf_stack, sizeof(disk_buf_stack), 0, 969 "mpgbuffer" IF_PRIO(, PRIORITY_BUFFERING) IF_COP(, CPU)); 970 971 rb->queue_enable_queue_send(disk_buf.q, &disk_buf_queue_send, 972 disk_buf.thread); 973 974 if (disk_buf.thread == 0) 975 return false; 976 977 /* Wait for thread to initialize */ 978 disk_buf_send_msg(STREAM_NULL, 0); 979 980 return true; 981} 982 983void disk_buf_exit(void) 984{ 985 if (disk_buf.thread != 0) 986 { 987 rb->queue_post(disk_buf.q, STREAM_QUIT, 0); 988 rb->thread_wait(disk_buf.thread); 989 disk_buf.thread = 0; 990 } 991}