A modern Music Player Daemon based on Rockbox open source high quality audio player
libadwaita audio rust zig deno mpris rockbox mpd
at master 764 lines 21 kB view raw
1/*************************************************************************** 2 * __________ __ ___. 3 * Open \______ \ ____ ____ | | _\_ |__ _______ ___ 4 * Source | _// _ \_/ ___\| |/ /| __ \ / _ \ \/ / 5 * Jukebox | | ( <_> ) \___| < | \_\ ( <_> > < < 6 * Firmware |____|_ /\____/ \___ >__|_ \|___ /\____/__/\_ \ 7 * \/ \/ \/ \/ \/ 8 * $Id$ 9 * 10 * Copyright (C) 2002 by Björn Stenberg 11 * 12 * This program is free software; you can redistribute it and/or 13 * modify it under the terms of the GNU General Public License 14 * as published by the Free Software Foundation; either version 2 15 * of the License, or (at your option) any later version. 16 * 17 * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY 18 * KIND, either express or implied. 19 * 20 ****************************************************************************/ 21#include <string.h> 22#include "kernel-internal.h" 23#include "queue.h" 24#include "general.h" 25 26/* This array holds all queues that are initiated. It is used for broadcast. */ 27static struct 28{ 29 struct event_queue *queues[MAX_NUM_QUEUES+1]; 30#ifdef HAVE_CORELOCK_OBJECT 31 struct corelock cl; 32#endif 33} all_queues SHAREDBSS_ATTR; 34 35/**************************************************************************** 36 * Queue handling stuff 37 ****************************************************************************/ 38 39#ifdef HAVE_EXTENDED_MESSAGING_AND_NAME 40/**************************************************************************** 41 * Sender thread queue structure that aids implementation of priority 42 * inheritance on queues because the send list structure is the same as 43 * for all other kernel objects: 44 * 45 * Example state: 46 * E0 added with queue_send and removed by thread via queue_wait(_w_tmo) 47 * E3 was posted with queue_post 48 * 4 events remain enqueued (E1-E4) 49 * 50 * rd wr 51 * q->events[]: | XX | E1 | E2 | E3 | E4 | XX | 52 * q->send->senders[]: | NULL | T1 | T2 | NULL | T3 | NULL | 53 * \/ \/ \/ 54 * q->send->list: 0<-|T0|<->|T1|<->|T2|<-------->|T3|->0 55 * q->send->curr_sender: /\ 56 * 57 * Thread has E0 in its own struct queue_event. 58 * 59 ****************************************************************************/ 60 61/* Puts the specified return value in the waiting thread's return value 62 * and wakes the thread. 63 * 64 * A sender should be confirmed to exist before calling which makes it 65 * more efficent to reject the majority of cases that don't need this 66 * called. 67 */ 68static void queue_release_sender_inner( 69 struct thread_entry * volatile * sender, intptr_t retval) 70{ 71 struct thread_entry *thread = *sender; 72 *sender = NULL; /* Clear slot. */ 73 thread->retval = retval; /* Assign thread-local return value. */ 74 wakeup_thread(thread, WAKEUP_RELEASE); 75} 76 77static inline void queue_release_sender( 78 struct thread_entry * volatile * sender, intptr_t retval) 79{ 80 if(UNLIKELY(*sender)) 81 queue_release_sender_inner(sender, retval); 82} 83 84/* Releases any waiting threads that are queued with queue_send - 85 * reply with 0. 86 */ 87static void queue_release_all_senders(struct event_queue *q) 88{ 89 if(q->send) 90 { 91 unsigned int i; 92 for(i = q->read; i != q->write; i++) 93 { 94 struct thread_entry **spp = 95 &q->send->senders[i & QUEUE_LENGTH_MASK]; 96 queue_release_sender(spp, 0); 97 } 98 } 99} 100 101/* Enables queue_send on the specified queue - caller allocates the extra 102 * data structure. Only queues which are taken to be owned by a thread should 103 * enable this however an official owner is not compulsory but must be 104 * specified for priority inheritance to operate. 105 * 106 * Use of queue_wait(_w_tmo) by multiple threads on a queue using synchronous 107 * messages results in an undefined order of message replies or possible default 108 * replies if two or more waits happen before a reply is done. 109 */ 110void queue_enable_queue_send(struct event_queue *q, 111 struct queue_sender_list *send, 112 unsigned int owner_id) 113{ 114 int oldlevel = disable_irq_save(); 115 corelock_lock(&q->cl); 116 117 if(send != NULL && q->send == NULL) 118 { 119 memset(send, 0, sizeof(*send)); 120 wait_queue_init(&send->list); 121#ifdef HAVE_PRIORITY_SCHEDULING 122 blocker_init(&send->blocker); 123 if(owner_id != 0) 124 { 125 send->blocker.thread = __thread_id_entry(owner_id); 126 q->blocker_p = &send->blocker; 127 } 128#endif 129 q->send = send; 130 } 131 132 corelock_unlock(&q->cl); 133 restore_irq(oldlevel); 134 135 (void)owner_id; 136} 137 138/* Unblock a blocked thread at a given event index */ 139static inline void queue_do_unblock_sender(struct queue_sender_list *send, 140 unsigned int i) 141{ 142 if(send) 143 queue_release_sender(&send->senders[i], 0); 144} 145 146/* Perform the auto-reply sequence */ 147static inline void queue_do_auto_reply(struct queue_sender_list *send) 148{ 149 if(send) 150 queue_release_sender(&send->curr_sender, 0); 151} 152 153/* Moves waiting thread's refrence from the senders array to the 154 * current_sender which represents the thread waiting for a reponse to the 155 * last message removed from the queue. This also protects the thread from 156 * being bumped due to overflow which would not be a valid action since its 157 * message _is_ being processed at this point. */ 158static inline void queue_do_fetch_sender(struct queue_sender_list *send, 159 unsigned int rd) 160{ 161 if(send) 162 { 163 struct thread_entry **spp = &send->senders[rd]; 164 165 if(*spp) 166 { 167 /* Move thread reference from array to the next thread 168 that queue_reply will release */ 169 send->curr_sender = *spp; 170 *spp = NULL; 171 } 172 /* else message was posted asynchronously with queue_post */ 173 } 174} 175#else 176/* Empty macros for when synchoronous sending is not made */ 177#define queue_release_all_senders(q) 178#define queue_do_unblock_sender(send, i) 179#define queue_do_auto_reply(send) 180#define queue_do_fetch_sender(send, rd) 181#endif /* HAVE_EXTENDED_MESSAGING_AND_NAME */ 182 183static void queue_wake_waiter_inner(struct thread_entry *thread) 184{ 185 wakeup_thread(thread, WAKEUP_DEFAULT); 186} 187 188static inline void queue_wake_waiter(struct event_queue *q) 189{ 190 struct thread_entry *thread = WQ_THREAD_FIRST(&q->queue); 191 if(thread != NULL) 192 queue_wake_waiter_inner(thread); 193} 194 195/* Queue must not be available for use during this call */ 196void queue_init(struct event_queue *q, bool register_queue) 197{ 198 int oldlevel = disable_irq_save(); 199 200 if(register_queue) 201 corelock_lock(&all_queues.cl); 202 203 corelock_init(&q->cl); 204 wait_queue_init(&q->queue); 205 /* What garbage is in write is irrelevant because of the masking design- 206 * any other functions the empty the queue do this as well so that 207 * queue_count and queue_empty return sane values in the case of a 208 * concurrent change without locking inside them. */ 209 q->read = q->write; 210#ifdef HAVE_EXTENDED_MESSAGING_AND_NAME 211 q->send = NULL; /* No message sending by default */ 212 IF_PRIO( q->blocker_p = NULL; ) 213#endif 214 215 if(register_queue) 216 { 217 void **queues = (void **)all_queues.queues; 218 void **p = find_array_ptr(queues, q); 219 220 if(p - queues >= MAX_NUM_QUEUES) 221 { 222 panicf("queue_init->out of queues"); 223 } 224 225 if(*p == NULL) 226 { 227 /* Add it to the all_queues array */ 228 *p = q; 229 corelock_unlock(&all_queues.cl); 230 } 231 } 232 233 restore_irq(oldlevel); 234} 235 236/* Queue must not be available for use during this call */ 237void queue_delete(struct event_queue *q) 238{ 239 int oldlevel = disable_irq_save(); 240 corelock_lock(&all_queues.cl); 241 corelock_lock(&q->cl); 242 243 /* Remove the queue if registered */ 244 remove_array_ptr((void **)all_queues.queues, q); 245 246 corelock_unlock(&all_queues.cl); 247 248 /* Release thread(s) waiting on queue head */ 249 wait_queue_wake(&q->queue); 250 251#ifdef HAVE_EXTENDED_MESSAGING_AND_NAME 252 if(q->send) 253 { 254 /* Release threads waiting for replies */ 255 queue_release_all_senders(q); 256 257 /* Reply to any dequeued message waiting for one */ 258 queue_do_auto_reply(q->send); 259 260 q->send = NULL; 261 IF_PRIO( q->blocker_p = NULL; ) 262 } 263#endif 264 265 q->read = q->write; 266 267 corelock_unlock(&q->cl); 268 restore_irq(oldlevel); 269} 270 271/* NOTE: multiple threads waiting on a queue head cannot have a well- 272 defined release order if timeouts are used. If multiple threads must 273 access the queue head, use a dispatcher or queue_wait only. */ 274void queue_wait(struct event_queue *q, struct queue_event *ev) 275{ 276 int oldlevel; 277 unsigned int rd; 278 279#ifdef HAVE_PRIORITY_SCHEDULING 280 KERNEL_ASSERT(QUEUE_GET_THREAD(q) == NULL || 281 QUEUE_GET_THREAD(q) == __running_self_entry(), 282 "queue_wait->wrong thread\n"); 283#endif 284 285 oldlevel = disable_irq_save(); 286 287 ASSERT_CPU_MODE(CPU_MODE_THREAD_CONTEXT, oldlevel); 288 289 corelock_lock(&q->cl); 290 291#ifdef HAVE_EXTENDED_MESSAGING_AND_NAME 292 /* Auto-reply (even if ev is NULL to avoid stalling a waiting thread) */ 293 queue_do_auto_reply(q->send); 294#endif 295 296 while(1) 297 { 298 rd = q->read; 299 if (rd != q->write) /* A waking message could disappear */ 300 break; 301 302 struct thread_entry *current = __running_self_entry(); 303 block_thread(current, TIMEOUT_BLOCK, &q->queue, NULL); 304 305 corelock_unlock(&q->cl); 306 switch_thread(); 307 308 disable_irq(); 309 corelock_lock(&q->cl); 310 } 311 312#ifdef HAVE_EXTENDED_MESSAGING_AND_NAME 313 if(ev) 314#endif 315 { 316 q->read = rd + 1; 317 rd &= QUEUE_LENGTH_MASK; 318 *ev = q->events[rd]; 319 320 /* Get data for a waiting thread if one */ 321 queue_do_fetch_sender(q->send, rd); 322 } 323 /* else just waiting on non-empty */ 324 325 corelock_unlock(&q->cl); 326 restore_irq(oldlevel); 327} 328 329void queue_wait_w_tmo(struct event_queue *q, struct queue_event *ev, int ticks) 330{ 331 int oldlevel; 332 unsigned int rd, wr; 333 334#ifdef HAVE_EXTENDED_MESSAGING_AND_NAME 335 KERNEL_ASSERT(QUEUE_GET_THREAD(q) == NULL || 336 QUEUE_GET_THREAD(q) == __running_self_entry(), 337 "queue_wait_w_tmo->wrong thread\n"); 338#endif 339 340 oldlevel = disable_irq_save(); 341 342 corelock_lock(&q->cl); 343 344#ifdef HAVE_EXTENDED_MESSAGING_AND_NAME 345 /* Auto-reply (even if ev is NULL to avoid stalling a waiting thread) */ 346 queue_do_auto_reply(q->send); 347#endif 348 349 rd = q->read; 350 wr = q->write; 351 352 if(rd != wr || ticks == 0) 353 ; /* no block */ 354 else while(1) 355 { 356 ASSERT_CPU_MODE(CPU_MODE_THREAD_CONTEXT, oldlevel); 357 358 struct thread_entry *current = __running_self_entry(); 359 block_thread(current, ticks, &q->queue, NULL); 360 361 corelock_unlock(&q->cl); 362 switch_thread(); 363 364 disable_irq(); 365 corelock_lock(&q->cl); 366 367 rd = q->read; 368 wr = q->write; 369 370 if(rd != wr) 371 break; 372 373 if(ticks < 0) 374 continue; /* empty again, infinite block */ 375 376 /* timeout is legit if thread is still queued and awake */ 377 if(LIKELY(wait_queue_try_remove(current))) 378 break; 379 380 /* we mustn't return earlier than expected wait time */ 381 ticks = get_tmo_tick(current) - current_tick; 382 if(ticks <= 0) 383 break; 384 } 385 386#ifdef HAVE_EXTENDED_MESSAGING_AND_NAME 387 if(UNLIKELY(!ev)) 388 ; /* just waiting for something */ 389 else 390#endif 391 if(rd != wr) 392 { 393 q->read = rd + 1; 394 rd &= QUEUE_LENGTH_MASK; 395 *ev = q->events[rd]; 396 397 /* Get data for a waiting thread if one */ 398 queue_do_fetch_sender(q->send, rd); 399 } 400 else 401 { 402 ev->id = SYS_TIMEOUT; 403 ev->data = 0; 404 } 405 406 corelock_unlock(&q->cl); 407 restore_irq(oldlevel); 408} 409 410void queue_post(struct event_queue *q, long id, intptr_t data) 411{ 412 int oldlevel; 413 unsigned int wr; 414 415 oldlevel = disable_irq_save(); 416 corelock_lock(&q->cl); 417 418 wr = q->write++ & QUEUE_LENGTH_MASK; 419 420 KERNEL_ASSERT((q->write - q->read) <= QUEUE_LENGTH, 421 "queue_post ovf q=%p", q); 422 423 q->events[wr].id = id; 424 q->events[wr].data = data; 425 426 /* overflow protect - unblock any thread waiting at this index */ 427 queue_do_unblock_sender(q->send, wr); 428 429 /* Wakeup a waiting thread if any */ 430 queue_wake_waiter(q); 431 432 corelock_unlock(&q->cl); 433 restore_irq(oldlevel); 434} 435 436#ifdef HAVE_EXTENDED_MESSAGING_AND_NAME 437/* IRQ handlers are not allowed use of this function - we only aim to 438 protect the queue integrity by turning them off. */ 439intptr_t queue_send(struct event_queue *q, long id, intptr_t data) 440{ 441 int oldlevel; 442 unsigned int wr; 443 444 oldlevel = disable_irq_save(); 445 446 ASSERT_CPU_MODE(CPU_MODE_THREAD_CONTEXT, oldlevel); 447 448 corelock_lock(&q->cl); 449 450 wr = q->write++ & QUEUE_LENGTH_MASK; 451 452 KERNEL_ASSERT((q->write - q->read) <= QUEUE_LENGTH, 453 "queue_send ovf q=%p", q); 454 455 q->events[wr].id = id; 456 q->events[wr].data = data; 457 458 if(LIKELY(q->send)) 459 { 460 struct queue_sender_list *send = q->send; 461 struct thread_entry **spp = &send->senders[wr]; 462 struct thread_entry *current = __running_self_entry(); 463 464 /* overflow protect - unblock any thread waiting at this index */ 465 queue_release_sender(spp, 0); 466 467 /* Wakeup a waiting thread if any */ 468 queue_wake_waiter(q); 469 470 /* Save thread in slot, add to list and wait for reply */ 471 *spp = current; 472 block_thread(current, TIMEOUT_BLOCK, &send->list, q->blocker_p); 473 474 corelock_unlock(&q->cl); 475 switch_thread(); 476 477 return current->retval; 478 } 479 480 /* Function as queue_post if sending is not enabled */ 481 queue_wake_waiter(q); 482 483 corelock_unlock(&q->cl); 484 restore_irq(oldlevel); 485 486 return 0; 487} 488 489#if 0 /* not used now but probably will be later */ 490/* Query if the last message dequeued was added by queue_send or not */ 491bool queue_in_queue_send(struct event_queue *q) 492{ 493 bool in_send; 494 495#if NUM_CORES > 1 496 int oldlevel = disable_irq_save(); 497 corelock_lock(&q->cl); 498#endif 499 500 in_send = q->send && q->send->curr_sender; 501 502#if NUM_CORES > 1 503 corelock_unlock(&q->cl); 504 restore_irq(oldlevel); 505#endif 506 507 return in_send; 508} 509#endif 510 511/* Replies with retval to the last dequeued message sent with queue_send */ 512void queue_reply(struct event_queue *q, intptr_t retval) 513{ 514 if(q->send && q->send->curr_sender) 515 { 516 int oldlevel = disable_irq_save(); 517 corelock_lock(&q->cl); 518 519 struct queue_sender_list *send = q->send; 520 if(send) 521 queue_release_sender(&send->curr_sender, retval); 522 523 corelock_unlock(&q->cl); 524 restore_irq(oldlevel); 525 } 526} 527#endif /* HAVE_EXTENDED_MESSAGING_AND_NAME */ 528 529#ifdef HAVE_EXTENDED_MESSAGING_AND_NAME 530/* Scan the even queue from head to tail, returning any event from the 531 filter list that was found, optionally removing the event. If an 532 event is returned, synchronous events are handled in the same manner as 533 with queue_wait(_w_tmo); if discarded, then as queue_clear. 534 If filters are NULL, any event matches. If filters exist, the default 535 is to search the full queue depth. 536 Earlier filters take precedence. 537 538 Return true if an event was found, false otherwise. */ 539bool queue_peek_ex(struct event_queue *q, struct queue_event *ev, 540 unsigned int flags, const long (*filters)[2]) 541{ 542 bool have_msg; 543 unsigned int rd, wr; 544 int oldlevel; 545 546 if(LIKELY(q->read == q->write)) 547 return false; /* Empty: do nothing further */ 548 549 have_msg = false; 550 551 oldlevel = disable_irq_save(); 552 corelock_lock(&q->cl); 553 554 /* Starting at the head, find first match */ 555 for(rd = q->read, wr = q->write; rd != wr; rd++) 556 { 557 struct queue_event *e = &q->events[rd & QUEUE_LENGTH_MASK]; 558 559 if(filters) 560 { 561 /* Have filters - find the first thing that passes */ 562 const long (* f)[2] = filters; 563 const long (* const f_last)[2] = 564 &filters[flags & QPEEK_FILTER_COUNT_MASK]; 565 long id = e->id; 566 567 do 568 { 569 if(UNLIKELY(id >= (*f)[0] && id <= (*f)[1])) 570 goto passed_filter; 571 } 572 while(++f <= f_last); 573 574 if(LIKELY(!(flags & QPEEK_FILTER_HEAD_ONLY))) 575 continue; /* No match; test next event */ 576 else 577 break; /* Only check the head */ 578 } 579 /* else - anything passes */ 580 581 passed_filter: 582 583 /* Found a matching event */ 584 have_msg = true; 585 586 if(ev) 587 *ev = *e; /* Caller wants the event */ 588 589 if(flags & QPEEK_REMOVE_EVENTS) 590 { 591 /* Do event removal */ 592 unsigned int r = q->read; 593 q->read = r + 1; /* Advance head */ 594 595 if(ev) 596 { 597 /* Auto-reply */ 598 queue_do_auto_reply(q->send); 599 /* Get the thread waiting for reply, if any */ 600 queue_do_fetch_sender(q->send, rd & QUEUE_LENGTH_MASK); 601 } 602 else 603 { 604 /* Release any thread waiting on this message */ 605 queue_do_unblock_sender(q->send, rd & QUEUE_LENGTH_MASK); 606 } 607 608 /* Slide messages forward into the gap if not at the head */ 609 while(rd != r) 610 { 611 unsigned int dst = rd & QUEUE_LENGTH_MASK; 612 unsigned int src = --rd & QUEUE_LENGTH_MASK; 613 614 q->events[dst] = q->events[src]; 615 /* Keep sender wait list in sync */ 616 if(q->send) 617 q->send->senders[dst] = q->send->senders[src]; 618 } 619 } 620 621 break; 622 } 623 624 corelock_unlock(&q->cl); 625 restore_irq(oldlevel); 626 627 return have_msg; 628} 629 630bool queue_peek(struct event_queue *q, struct queue_event *ev) 631{ 632 return queue_peek_ex(q, ev, 0, NULL); 633} 634 635void queue_remove_from_head(struct event_queue *q, long id) 636{ 637 const long f[2] = { id, id }; 638 while (queue_peek_ex(q, NULL, 639 QPEEK_FILTER_HEAD_ONLY | QPEEK_REMOVE_EVENTS, &f)); 640} 641#else /* !HAVE_EXTENDED_MESSAGING_AND_NAME */ 642/* The more powerful routines aren't required */ 643bool queue_peek(struct event_queue *q, struct queue_event *ev) 644{ 645 unsigned int rd; 646 647 if(q->read == q->write) 648 return false; 649 650 bool have_msg = false; 651 652 int oldlevel = disable_irq_save(); 653 corelock_lock(&q->cl); 654 655 rd = q->read; 656 if(rd != q->write) 657 { 658 *ev = q->events[rd & QUEUE_LENGTH_MASK]; 659 have_msg = true; 660 } 661 662 corelock_unlock(&q->cl); 663 restore_irq(oldlevel); 664 665 return have_msg; 666} 667 668void queue_remove_from_head(struct event_queue *q, long id) 669{ 670 int oldlevel; 671 672 oldlevel = disable_irq_save(); 673 corelock_lock(&q->cl); 674 675 while(q->read != q->write) 676 { 677 unsigned int rd = q->read & QUEUE_LENGTH_MASK; 678 679 if(q->events[rd].id != id) 680 { 681 break; 682 } 683 684 /* Release any thread waiting on this message */ 685 queue_do_unblock_sender(q->send, rd); 686 687 q->read++; 688 } 689 690 corelock_unlock(&q->cl); 691 restore_irq(oldlevel); 692} 693#endif /* HAVE_EXTENDED_MESSAGING_AND_NAME */ 694 695/* Poll queue to see if a message exists - careful in using the result if 696 * queue_remove_from_head is called when messages are posted - possibly use 697 * queue_wait_w_tmo(&q, 0) in that case or else a removed message that 698 * unsignals the queue may cause an unwanted block */ 699bool queue_empty(const struct event_queue* q) 700{ 701 return ( q->read == q->write ); 702} 703 704/* Poll queue to see if it is full */ 705bool queue_full(const struct event_queue* q) 706{ 707 return ((q->write - q->read) >= QUEUE_LENGTH); 708} 709 710void queue_clear(struct event_queue* q) 711{ 712 int oldlevel; 713 714 oldlevel = disable_irq_save(); 715 corelock_lock(&q->cl); 716 717 /* Release all threads waiting in the queue for a reply - 718 dequeued sent message will be handled by owning thread */ 719 queue_release_all_senders(q); 720 721 q->read = q->write; 722 723 corelock_unlock(&q->cl); 724 restore_irq(oldlevel); 725} 726 727/** 728 * The number of events waiting in the queue. 729 * 730 * @param struct of event_queue 731 * @return number of events in the queue 732 */ 733int queue_count(const struct event_queue *q) 734{ 735 return q->write - q->read; 736} 737 738int queue_broadcast(long id, intptr_t data) 739{ 740 struct event_queue **p = all_queues.queues; 741 struct event_queue *q; 742 743#if NUM_CORES > 1 744 int oldlevel = disable_irq_save(); 745 corelock_lock(&all_queues.cl); 746#endif 747 748 for(q = *p; q != NULL; q = *(++p)) 749 { 750 queue_post(q, id, data); 751 } 752 753#if NUM_CORES > 1 754 corelock_unlock(&all_queues.cl); 755 restore_irq(oldlevel); 756#endif 757 758 return p - all_queues.queues; 759} 760 761void init_queues(void) 762{ 763 corelock_init(&all_queues.cl); 764}