Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

Optimize spi_sync path

Merge series from David Jander <david@protonic.nl>:

These patches optimize the spi_sync call for the common case that the
worker thread is idle and the queue is empty. It also opens the
possibility to potentially further optimize the async path also, since
it doesn't need to take into account the direct sync path anymore.

As an example for the performance gain, on an i.MX8MM SoC with a SPI CAN
controller attached (MCP2518FD), the time the interrupt line stays
active (which corresponds roughly with the time it takes to send 3
relatively short consecutive spi_sync messages) is reduced from 98us to
only 72us by this patch.

A note about message ordering:

This patch series should not change the behavior of message ordering when
coming from the same context. This means that if a client driver issues
one or more spi_async() messages immediately followed by a spi_sync()
message in the same context, it can still rely on these messages being
sent out in the order they were fired.

+213 -138
+194 -133
drivers/spi/spi.c
··· 1549 1549 } 1550 1550 } 1551 1551 1552 - /** 1553 - * __spi_pump_messages - function which processes spi message queue 1554 - * @ctlr: controller to process queue for 1555 - * @in_kthread: true if we are in the context of the message pump thread 1556 - * 1557 - * This function checks if there is any spi message in the queue that 1558 - * needs processing and if so call out to the driver to initialize hardware 1559 - * and transfer each message. 1560 - * 1561 - * Note that it is called both from the kthread itself and also from 1562 - * inside spi_sync(); the queue extraction handling at the top of the 1563 - * function should deal with this safely. 1564 - */ 1565 - static void __spi_pump_messages(struct spi_controller *ctlr, bool in_kthread) 1552 + static int __spi_pump_transfer_message(struct spi_controller *ctlr, 1553 + struct spi_message *msg, bool was_busy) 1566 1554 { 1567 1555 struct spi_transfer *xfer; 1568 - struct spi_message *msg; 1569 - bool was_busy = false; 1570 - unsigned long flags; 1571 1556 int ret; 1572 1557 1573 - /* Lock queue */ 1574 - spin_lock_irqsave(&ctlr->queue_lock, flags); 1575 - 1576 - /* Make sure we are not already running a message */ 1577 - if (ctlr->cur_msg) { 1578 - spin_unlock_irqrestore(&ctlr->queue_lock, flags); 1579 - return; 1580 - } 1581 - 1582 - /* If another context is idling the device then defer */ 1583 - if (ctlr->idling) { 1584 - kthread_queue_work(ctlr->kworker, &ctlr->pump_messages); 1585 - spin_unlock_irqrestore(&ctlr->queue_lock, flags); 1586 - return; 1587 - } 1588 - 1589 - /* Check if the queue is idle */ 1590 - if (list_empty(&ctlr->queue) || !ctlr->running) { 1591 - if (!ctlr->busy) { 1592 - spin_unlock_irqrestore(&ctlr->queue_lock, flags); 1593 - return; 1594 - } 1595 - 1596 - /* Defer any non-atomic teardown to the thread */ 1597 - if (!in_kthread) { 1598 - if (!ctlr->dummy_rx && !ctlr->dummy_tx && 1599 - !ctlr->unprepare_transfer_hardware) { 1600 - spi_idle_runtime_pm(ctlr); 1601 - ctlr->busy = false; 1602 - trace_spi_controller_idle(ctlr); 1603 - } else { 1604 - kthread_queue_work(ctlr->kworker, 1605 - &ctlr->pump_messages); 1606 - } 1607 - spin_unlock_irqrestore(&ctlr->queue_lock, flags); 1608 - return; 1609 - } 1610 - 1611 - ctlr->busy = false; 1612 - ctlr->idling = true; 1613 - spin_unlock_irqrestore(&ctlr->queue_lock, flags); 1614 - 1615 - kfree(ctlr->dummy_rx); 1616 - ctlr->dummy_rx = NULL; 1617 - kfree(ctlr->dummy_tx); 1618 - ctlr->dummy_tx = NULL; 1619 - if (ctlr->unprepare_transfer_hardware && 1620 - ctlr->unprepare_transfer_hardware(ctlr)) 1621 - dev_err(&ctlr->dev, 1622 - "failed to unprepare transfer hardware\n"); 1623 - spi_idle_runtime_pm(ctlr); 1624 - trace_spi_controller_idle(ctlr); 1625 - 1626 - spin_lock_irqsave(&ctlr->queue_lock, flags); 1627 - ctlr->idling = false; 1628 - spin_unlock_irqrestore(&ctlr->queue_lock, flags); 1629 - return; 1630 - } 1631 - 1632 - /* Extract head of queue */ 1633 - msg = list_first_entry(&ctlr->queue, struct spi_message, queue); 1634 - ctlr->cur_msg = msg; 1635 - 1636 - list_del_init(&msg->queue); 1637 - if (ctlr->busy) 1638 - was_busy = true; 1639 - else 1640 - ctlr->busy = true; 1641 - spin_unlock_irqrestore(&ctlr->queue_lock, flags); 1642 - 1643 - mutex_lock(&ctlr->io_mutex); 1644 - 1645 1558 if (!was_busy && ctlr->auto_runtime_pm) { 1646 - ret = pm_runtime_resume_and_get(ctlr->dev.parent); 1559 + ret = pm_runtime_get_sync(ctlr->dev.parent); 1647 1560 if (ret < 0) { 1561 + pm_runtime_put_noidle(ctlr->dev.parent); 1648 1562 dev_err(&ctlr->dev, "Failed to power device: %d\n", 1649 1563 ret); 1650 - mutex_unlock(&ctlr->io_mutex); 1651 - return; 1564 + return ret; 1652 1565 } 1653 1566 } 1654 1567 ··· 1581 1668 msg->status = ret; 1582 1669 spi_finalize_current_message(ctlr); 1583 1670 1584 - mutex_unlock(&ctlr->io_mutex); 1585 - return; 1671 + return ret; 1586 1672 } 1587 1673 } 1588 1674 ··· 1594 1682 ret); 1595 1683 msg->status = ret; 1596 1684 spi_finalize_current_message(ctlr); 1597 - goto out; 1685 + return ret; 1598 1686 } 1599 - ctlr->cur_msg_prepared = true; 1687 + msg->prepared = true; 1600 1688 } 1601 1689 1602 1690 ret = spi_map_msg(ctlr, msg); 1603 1691 if (ret) { 1604 1692 msg->status = ret; 1605 1693 spi_finalize_current_message(ctlr); 1606 - goto out; 1694 + return ret; 1607 1695 } 1608 1696 1609 1697 if (!ctlr->ptp_sts_supported && !ctlr->transfer_one) { ··· 1613 1701 } 1614 1702 } 1615 1703 1704 + /* 1705 + * Drivers implementation of transfer_one_message() must arrange for 1706 + * spi_finalize_current_message() to get called. Most drivers will do 1707 + * this in the calling context, but some don't. For those cases, a 1708 + * completion is used to guarantee that this function does not return 1709 + * until spi_finalize_current_message() is done accessing 1710 + * ctlr->cur_msg. 1711 + * Use of the following two flags enable to opportunistically skip the 1712 + * use of the completion since its use involves expensive spin locks. 1713 + * In case of a race with the context that calls 1714 + * spi_finalize_current_message() the completion will always be used, 1715 + * due to strict ordering of these flags using barriers. 1716 + */ 1717 + WRITE_ONCE(ctlr->cur_msg_incomplete, true); 1718 + WRITE_ONCE(ctlr->cur_msg_need_completion, false); 1719 + reinit_completion(&ctlr->cur_msg_completion); 1720 + smp_wmb(); /* make these available to spi_finalize_current_message */ 1721 + 1616 1722 ret = ctlr->transfer_one_message(ctlr, msg); 1617 1723 if (ret) { 1618 1724 dev_err(&ctlr->dev, 1619 - "failed to transfer one message from queue: %d\n", 1620 - ret); 1621 - goto out; 1725 + "failed to transfer one message from queue\n"); 1726 + return ret; 1727 + } else { 1728 + WRITE_ONCE(ctlr->cur_msg_need_completion, true); 1729 + smp_mb(); /* see spi_finalize_current_message()... */ 1730 + if (READ_ONCE(ctlr->cur_msg_incomplete)) 1731 + wait_for_completion(&ctlr->cur_msg_completion); 1622 1732 } 1623 1733 1624 - out: 1734 + return 0; 1735 + } 1736 + 1737 + /** 1738 + * __spi_pump_messages - function which processes spi message queue 1739 + * @ctlr: controller to process queue for 1740 + * @in_kthread: true if we are in the context of the message pump thread 1741 + * 1742 + * This function checks if there is any spi message in the queue that 1743 + * needs processing and if so call out to the driver to initialize hardware 1744 + * and transfer each message. 1745 + * 1746 + * Note that it is called both from the kthread itself and also from 1747 + * inside spi_sync(); the queue extraction handling at the top of the 1748 + * function should deal with this safely. 1749 + */ 1750 + static void __spi_pump_messages(struct spi_controller *ctlr, bool in_kthread) 1751 + { 1752 + struct spi_message *msg; 1753 + bool was_busy = false; 1754 + unsigned long flags; 1755 + int ret; 1756 + 1757 + /* Take the IO mutex */ 1758 + mutex_lock(&ctlr->io_mutex); 1759 + 1760 + /* Lock queue */ 1761 + spin_lock_irqsave(&ctlr->queue_lock, flags); 1762 + 1763 + /* Make sure we are not already running a message */ 1764 + if (ctlr->cur_msg) 1765 + goto out_unlock; 1766 + 1767 + /* Check if the queue is idle */ 1768 + if (list_empty(&ctlr->queue) || !ctlr->running) { 1769 + if (!ctlr->busy) 1770 + goto out_unlock; 1771 + 1772 + /* Defer any non-atomic teardown to the thread */ 1773 + if (!in_kthread) { 1774 + if (!ctlr->dummy_rx && !ctlr->dummy_tx && 1775 + !ctlr->unprepare_transfer_hardware) { 1776 + spi_idle_runtime_pm(ctlr); 1777 + ctlr->busy = false; 1778 + ctlr->queue_empty = true; 1779 + trace_spi_controller_idle(ctlr); 1780 + } else { 1781 + kthread_queue_work(ctlr->kworker, 1782 + &ctlr->pump_messages); 1783 + } 1784 + goto out_unlock; 1785 + } 1786 + 1787 + ctlr->busy = false; 1788 + spin_unlock_irqrestore(&ctlr->queue_lock, flags); 1789 + 1790 + kfree(ctlr->dummy_rx); 1791 + ctlr->dummy_rx = NULL; 1792 + kfree(ctlr->dummy_tx); 1793 + ctlr->dummy_tx = NULL; 1794 + if (ctlr->unprepare_transfer_hardware && 1795 + ctlr->unprepare_transfer_hardware(ctlr)) 1796 + dev_err(&ctlr->dev, 1797 + "failed to unprepare transfer hardware\n"); 1798 + spi_idle_runtime_pm(ctlr); 1799 + trace_spi_controller_idle(ctlr); 1800 + 1801 + spin_lock_irqsave(&ctlr->queue_lock, flags); 1802 + ctlr->queue_empty = true; 1803 + goto out_unlock; 1804 + } 1805 + 1806 + /* Extract head of queue */ 1807 + msg = list_first_entry(&ctlr->queue, struct spi_message, queue); 1808 + ctlr->cur_msg = msg; 1809 + 1810 + list_del_init(&msg->queue); 1811 + if (ctlr->busy) 1812 + was_busy = true; 1813 + else 1814 + ctlr->busy = true; 1815 + spin_unlock_irqrestore(&ctlr->queue_lock, flags); 1816 + 1817 + ret = __spi_pump_transfer_message(ctlr, msg, was_busy); 1818 + 1819 + if (!ret) 1820 + kthread_queue_work(ctlr->kworker, &ctlr->pump_messages); 1821 + ctlr->cur_msg = NULL; 1822 + ctlr->fallback = false; 1823 + 1625 1824 mutex_unlock(&ctlr->io_mutex); 1626 1825 1627 1826 /* Prod the scheduler in case transfer_one() was busy waiting */ 1628 1827 if (!ret) 1629 1828 cond_resched(); 1829 + return; 1830 + 1831 + out_unlock: 1832 + spin_unlock_irqrestore(&ctlr->queue_lock, flags); 1833 + mutex_unlock(&ctlr->io_mutex); 1630 1834 } 1631 1835 1632 1836 /** ··· 1867 1839 { 1868 1840 ctlr->running = false; 1869 1841 ctlr->busy = false; 1842 + ctlr->queue_empty = true; 1870 1843 1871 1844 ctlr->kworker = kthread_create_worker(0, dev_name(&ctlr->dev)); 1872 1845 if (IS_ERR(ctlr->kworker)) { ··· 1926 1897 { 1927 1898 struct spi_transfer *xfer; 1928 1899 struct spi_message *mesg; 1929 - unsigned long flags; 1930 1900 int ret; 1931 1901 1932 - spin_lock_irqsave(&ctlr->queue_lock, flags); 1933 1902 mesg = ctlr->cur_msg; 1934 - spin_unlock_irqrestore(&ctlr->queue_lock, flags); 1935 1903 1936 1904 if (!ctlr->ptp_sts_supported && !ctlr->transfer_one) { 1937 1905 list_for_each_entry(xfer, &mesg->transfers, transfer_list) { ··· 1952 1926 */ 1953 1927 spi_res_release(ctlr, mesg); 1954 1928 1955 - if (ctlr->cur_msg_prepared && ctlr->unprepare_message) { 1929 + if (mesg->prepared && ctlr->unprepare_message) { 1956 1930 ret = ctlr->unprepare_message(ctlr, mesg); 1957 1931 if (ret) { 1958 1932 dev_err(&ctlr->dev, "failed to unprepare message: %d\n", ··· 1960 1934 } 1961 1935 } 1962 1936 1963 - spin_lock_irqsave(&ctlr->queue_lock, flags); 1964 - ctlr->cur_msg = NULL; 1965 - ctlr->cur_msg_prepared = false; 1966 - ctlr->fallback = false; 1967 - kthread_queue_work(ctlr->kworker, &ctlr->pump_messages); 1968 - spin_unlock_irqrestore(&ctlr->queue_lock, flags); 1937 + mesg->prepared = false; 1938 + 1939 + WRITE_ONCE(ctlr->cur_msg_incomplete, false); 1940 + smp_mb(); /* See __spi_pump_transfer_message()... */ 1941 + if (READ_ONCE(ctlr->cur_msg_need_completion)) 1942 + complete(&ctlr->cur_msg_completion); 1969 1943 1970 1944 trace_spi_message_done(mesg); 1971 1945 ··· 2068 2042 msg->status = -EINPROGRESS; 2069 2043 2070 2044 list_add_tail(&msg->queue, &ctlr->queue); 2045 + ctlr->queue_empty = false; 2071 2046 if (!ctlr->busy && need_pump) 2072 2047 kthread_queue_work(ctlr->kworker, &ctlr->pump_messages); 2073 2048 ··· 3052 3025 } 3053 3026 ctlr->bus_lock_flag = 0; 3054 3027 init_completion(&ctlr->xfer_completion); 3028 + init_completion(&ctlr->cur_msg_completion); 3055 3029 if (!ctlr->max_dma_len) 3056 3030 ctlr->max_dma_len = INT_MAX; 3057 3031 ··· 3965 3937 3966 3938 } 3967 3939 3940 + static void __spi_transfer_message_noqueue(struct spi_controller *ctlr, struct spi_message *msg) 3941 + { 3942 + bool was_busy; 3943 + int ret; 3944 + 3945 + mutex_lock(&ctlr->io_mutex); 3946 + 3947 + was_busy = ctlr->busy; 3948 + 3949 + ctlr->cur_msg = msg; 3950 + ret = __spi_pump_transfer_message(ctlr, msg, was_busy); 3951 + if (ret) 3952 + goto out; 3953 + 3954 + ctlr->cur_msg = NULL; 3955 + ctlr->fallback = false; 3956 + 3957 + if (!was_busy) { 3958 + kfree(ctlr->dummy_rx); 3959 + ctlr->dummy_rx = NULL; 3960 + kfree(ctlr->dummy_tx); 3961 + ctlr->dummy_tx = NULL; 3962 + if (ctlr->unprepare_transfer_hardware && 3963 + ctlr->unprepare_transfer_hardware(ctlr)) 3964 + dev_err(&ctlr->dev, 3965 + "failed to unprepare transfer hardware\n"); 3966 + spi_idle_runtime_pm(ctlr); 3967 + } 3968 + 3969 + out: 3970 + mutex_unlock(&ctlr->io_mutex); 3971 + } 3972 + 3968 3973 /*-------------------------------------------------------------------------*/ 3969 3974 3970 3975 /* ··· 4016 3955 DECLARE_COMPLETION_ONSTACK(done); 4017 3956 int status; 4018 3957 struct spi_controller *ctlr = spi->controller; 4019 - unsigned long flags; 4020 3958 4021 3959 status = __spi_validate(spi, message); 4022 3960 if (status != 0) 4023 3961 return status; 4024 3962 4025 - message->complete = spi_complete; 4026 - message->context = &done; 4027 3963 message->spi = spi; 4028 3964 4029 3965 SPI_STATISTICS_INCREMENT_FIELD(ctlr->pcpu_statistics, spi_sync); 4030 3966 SPI_STATISTICS_INCREMENT_FIELD(spi->pcpu_statistics, spi_sync); 4031 3967 4032 3968 /* 4033 - * If we're not using the legacy transfer method then we will 4034 - * try to transfer in the calling context so special case. 4035 - * This code would be less tricky if we could remove the 4036 - * support for driver implemented message queues. 3969 + * Checking queue_empty here only guarantees async/sync message 3970 + * ordering when coming from the same context. It does not need to 3971 + * guard against reentrancy from a different context. The io_mutex 3972 + * will catch those cases. 4037 3973 */ 4038 - if (ctlr->transfer == spi_queued_transfer) { 4039 - spin_lock_irqsave(&ctlr->bus_lock_spinlock, flags); 3974 + if (READ_ONCE(ctlr->queue_empty)) { 3975 + message->actual_length = 0; 3976 + message->status = -EINPROGRESS; 4040 3977 4041 3978 trace_spi_message_submit(message); 4042 3979 4043 - status = __spi_queued_transfer(spi, message, false); 3980 + SPI_STATISTICS_INCREMENT_FIELD(ctlr->pcpu_statistics, spi_sync_immediate); 3981 + SPI_STATISTICS_INCREMENT_FIELD(spi->pcpu_statistics, spi_sync_immediate); 4044 3982 4045 - spin_unlock_irqrestore(&ctlr->bus_lock_spinlock, flags); 4046 - } else { 4047 - status = spi_async_locked(spi, message); 3983 + __spi_transfer_message_noqueue(ctlr, message); 3984 + 3985 + return message->status; 4048 3986 } 4049 3987 3988 + /* 3989 + * There are messages in the async queue that could have originated 3990 + * from the same context, so we need to preserve ordering. 3991 + * Therefor we send the message to the async queue and wait until they 3992 + * are completed. 3993 + */ 3994 + message->complete = spi_complete; 3995 + message->context = &done; 3996 + status = spi_async_locked(spi, message); 4050 3997 if (status == 0) { 4051 - /* Push out the messages in the calling context if we can */ 4052 - if (ctlr->transfer == spi_queued_transfer) { 4053 - SPI_STATISTICS_INCREMENT_FIELD(ctlr->pcpu_statistics, 4054 - spi_sync_immediate); 4055 - SPI_STATISTICS_INCREMENT_FIELD(spi->pcpu_statistics, 4056 - spi_sync_immediate); 4057 - __spi_pump_messages(ctlr, false); 4058 - } 4059 - 4060 3998 wait_for_completion(&done); 4061 3999 status = message->status; 4062 4000 } 4063 4001 message->context = NULL; 4002 + 4064 4003 return status; 4065 4004 } 4066 4005
+19 -5
include/linux/spi/spi.h
··· 383 383 * @pump_messages: work struct for scheduling work to the message pump 384 384 * @queue_lock: spinlock to syncronise access to message queue 385 385 * @queue: message queue 386 - * @idling: the device is entering idle state 387 386 * @cur_msg: the currently in-flight message 388 - * @cur_msg_prepared: spi_prepare_message was called for the currently 389 - * in-flight message 387 + * @cur_msg_completion: a completion for the current in-flight message 388 + * @cur_msg_incomplete: Flag used internally to opportunistically skip 389 + * the @cur_msg_completion. This flag is used to check if the driver has 390 + * already called spi_finalize_current_message(). 391 + * @cur_msg_need_completion: Flag used internally to opportunistically skip 392 + * the @cur_msg_completion. This flag is used to signal the context that 393 + * is running spi_finalize_current_message() that it needs to complete() 390 394 * @cur_msg_mapped: message has been mapped for DMA 391 395 * @last_cs: the last chip_select that is recorded by set_cs, -1 on non chip 392 396 * selected ··· 467 463 * @irq_flags: Interrupt enable state during PTP system timestamping 468 464 * @fallback: fallback to pio if dma transfer return failure with 469 465 * SPI_TRANS_FAIL_NO_START. 466 + * @queue_empty: signal green light for opportunistically skipping the queue 467 + * for spi_sync transfers. 470 468 * 471 469 * Each SPI controller can communicate with one or more @spi_device 472 470 * children. These make a small bus, sharing MOSI, MISO and SCK signals ··· 622 616 spinlock_t queue_lock; 623 617 struct list_head queue; 624 618 struct spi_message *cur_msg; 625 - bool idling; 619 + struct completion cur_msg_completion; 620 + bool cur_msg_incomplete; 621 + bool cur_msg_need_completion; 626 622 bool busy; 627 623 bool running; 628 624 bool rt; 629 625 bool auto_runtime_pm; 630 - bool cur_msg_prepared; 631 626 bool cur_msg_mapped; 632 627 char last_cs; 633 628 bool last_cs_mode_high; ··· 687 680 688 681 /* Interrupt enable state during PTP system timestamping */ 689 682 unsigned long irq_flags; 683 + 684 + /* Flag for enabling opportunistic skipping of the queue in spi_sync */ 685 + bool queue_empty; 690 686 }; 691 687 692 688 static inline void *spi_controller_get_devdata(struct spi_controller *ctlr) ··· 998 988 * @queue: for use by whichever driver currently owns the message 999 989 * @state: for use by whichever driver currently owns the message 1000 990 * @resources: for resource management when the spi message is processed 991 + * @prepared: spi_prepare_message was called for the this message 1001 992 * 1002 993 * A @spi_message is used to execute an atomic sequence of data transfers, 1003 994 * each represented by a struct spi_transfer. The sequence is "atomic" ··· 1048 1037 1049 1038 /* list of spi_res reources when the spi message is processed */ 1050 1039 struct list_head resources; 1040 + 1041 + /* spi_prepare_message was called for this message */ 1042 + bool prepared; 1051 1043 }; 1052 1044 1053 1045 static inline void spi_message_init_no_memset(struct spi_message *m)