1、原子操作
通常我们代码中的a = a + 1这样的一行语句,翻译成汇编后蕴含着3条指令:
ldr x0, &a
add x0,x0,#1
str x0,&a
即
(1)从内存中读取a变量到X0寄存器
(2)X0寄存器加1
(3)将X0写入到内存a中
既然是3条指令,那么就有可能并发,也就意味着返回的结果可能不说预期的。
然后在linux kernel的操作系统中,提供访问原子变量的函数,用来解决上述问题。其中部分原子操作的API如下:
atomic_read
atomic_add_return(i,v)
atomic_add(i,v)
atomic_inc(v)
atomic_add_unless(v,a,u)
atomic_inc_not_zero(v)
atomic_sub_return(i,v)
atomic_sub_and_test(i,v)
atomic_sub(i,v)
atomic_dec(v)
atomic_cmpxchg(v,old,new)
那么操作系统(仅仅是软件而已)是如何保证原子操作的呢?(还是得靠硬件),硬件原理是什么呢?
以上的那些API函数,在底层调用的其实都是如下__lse_atomic_add_return##name宏的封装,这段代码中最核心的也就是ldadd指令了,这是armv8.1增加的LSE(Large System Extension)feature。
(linux/arch/arm64/include/asm/atomic_lse.h)
static inline int __lse_atomic_add_return##name(int i, atomic_t *v) \
{ \
u32 tmp; \
\
asm volatile( \
__LSE_PREAMBLE \
" ldadd" #mb " %w[i], %w[tmp], %[v]\n" \
" add %w[i], %w[i], %w[tmp]" \
: [i] "+r" (i), [v] "+Q" (v->counter), [tmp] "=&r" (tmp) \
: "r" (v) \
: cl); \
\
return i; \
}
那么系统如果没有LSE扩展呢,即armv8.0,其实现的原型如下所示,这段代码中最核心的也就是ldxr、stxr指令了
(linux/arch/arm64/include/asm/atomic_ll_sc.h)
static inline void _ll_sc_atomic##op(int i, atomic_t *v)\
{ \
unsigned long tmp; \
int result; \
\
asm volatile(“// atomic_” #op “\n” \
__LL_SC_FALLBACK( \
" prfm pstl1strm, %2\n" \
“1: ldxr %w0, %2\n” \
" " #asm_op " %w0, %w0, %w3\n" \
" stxr %w1, %w0, %2\n" \
" cbnz %w1, 1b\n") \
: "=&r" (result), "=&r" (tmp), "+Q" (v->counter) \
: __stringify(constraint) "r" (i)); \
}
那么在armv8.0之前呢,如armv7是怎样实现的? 如下所示, 这段代码中最核心的也就是ldrex、strex指令了
(linux/arch/arm/include/asm/atomic.h)
static inline void atomic_##op(int i, atomic_t *v) \
{ \
unsigned long tmp;
int result; \
\
prefetchw(&v->counter); \
asm volatile(“@ atomic_” #op “\n” \
“1: ldrex %0, [%3]\n” \
" " #asm_op " %0, %0, %4\n" \
" strex %1, %0, [%3]\n" \
" teq %1, #0\n" \
" bne 1b" \
: "=&r" (result), "=&r" (tmp), "+Qo" (v->counter) \
: "r" (&v->counter), "Ir" (i) \
: "cc"); \
}
总结:
在很早期,使用arm的exclusive机制来实现的原子操作,exclusive相关的指令也就是ldrex、strex了,但在armv8后,exclusive机制的指令发生了变化变成了ldxr、stxr。但是又由于在一个大系统中,处理器是非常多的,竞争也激烈,使用独占的存储和加载指令可能要多次尝试才能成功,性能也就变得很差,在armv8.1为了解决该问题,增加了ldadd等相关的原子操作指令
spinlock 自旋锁
早期spinlock的设计
早期的spinlock的设计是锁的拥有者加锁时将锁的值设置为1,释放锁时将锁的值设置为0,这样做的缺点是会出现 先来抢占锁的进程一直抢占不到锁,而后来的进程可能一来 就能获取到锁。导致这个原因的是先抢占的进程和后抢占的进程在抢占锁时并没有一个先后关系,最终就是离锁所在的内存最近的cpu节点就有更多的机会抢占锁,离锁所在内存远的节点可能一直抢占不到。
新版spinlock设计
为了解决这个spinlock的不公平问题,linux 2.6.25内核以后,spinlock采用了一种"FIFO ticket-based"算法的spinlock机制,可以很好的实现先来先抢占的思想。具体的做法如下:
(1)、spinlock的核心字段有owner和next,在初始时,owner=next=0
(2)、当第一个进程抢占spinlock时,会在进程函数本地保存下next的值,也就是next=0,并将spinlock的next字段加1;
(3)、当获取spinlock的进程的本地next和spinlock的owner相等时,该进程就获取到spinlock;
(4)、由于第一个进程本地的next=0,并且spinlock的owner为0,所以第一个CPU获取到spinlock;
(5)、接着当第二个进程抢占spinlock,此时spinlock的next值为1,保存到本地,然后将spinlock的next字段加1。而spinlock的owner字段依然为0,第二个进程的本地next 不等于spinlock的owner,所以一直自旋等待spinlock;
(6)、第三个进程抢占spinlock,得到本地next值为2,然后将spinlock的next字段加1。此时spinlock的owner字段还是为0,所以第三个进程自旋等待。
(7)、当第一个进程处理完临界区以后,就释放spinlock,执行的操作是将spinlock的owner字段加1;
(8)、由于第二个进程和第三个进程都还在等待spinlock,他们会不停第获取spinlock的owner字段,并和自己本地的next值进行比较。当第二个进程发现自己的next值和spinlock的owner字段相等时(此时next == owner == 2),第二个进程就获取到spinlock。第三个进程的本地next值是3,和spinlock的owner字段不相等,所以继续等待;
(9)、只有在第二个进程释放了spinlock,就会将spinlock的owner字段加1,第三个进程才有机会获取spinlock。
我在举个例子,如下:
T1 : 进程1调用spin_lock,此时next=0, owner=0获得该锁,在arch_spin_lock()底层实现中,会next++
T2 : 进程2调用spin_lock,此时next=1, owner=0没有获得该锁,while(1)中调用wfe指令standby在那里,等待ownernext成立.
T3 : 进程3调用spin_lock,此时next=2, owner=0没有获得该锁,while(1)中调用wfe指令standby在那里,等待ownernext成立.
T4&T5 : 进程1调用spin_unlock,此时owner++,即owner=1,接着调用sev指令,让进程2和进程3退出standby状态,走while(1)流程,重新检查owner==next条件。此时进程2条件成立,进程3继续等待。进程2获得该锁,进程3继续等待。
Linux Kernel中的SpinLock的实现
(linux/include/linux/spinlock.h)
static __always_inline void spin_unlock(spinlock_t *lock)
{
raw_spin_unlock(&lock->rlock);
}
static __always_inline void spin_lock(spinlock_t *lock)
{
raw_spin_lock(&lock->rlock);
}
(linux/include/linux/spinlock.h)
define raw_spin_lock_irq(lock) _raw_spin_lock_irq(lock)
define raw_spin_lock_bh(lock) _raw_spin_lock_bh(lock)
define raw_spin_unlock(lock) _raw_spin_unlock(lock)
define raw_spin_unlock_irq(lock) _raw_spin_unlock_irq(lock)
define raw_spin_lock(lock) _raw_spin_lock(lock)
(linux/kernel/locking/spinlock.c)
ifdef CONFIG_UNINLINE_SPIN_UNLOCK
void __lockfunc _raw_spin_unlock(raw_spinlock_t *lock)
{
__raw_spin_unlock(lock);
}
EXPORT_SYMBOL(_raw_spin_unlock);
endif
ifndef CONFIG_INLINE_SPIN_LOCK
void __lockfunc _raw_spin_lock(raw_spinlock_t *lock)
{
__raw_spin_lock(lock);
}
EXPORT_SYMBOL(_raw_spin_lock);
endif
(linux/include/linux/spinlock_api_smp.h)
static inline void __raw_spin_unlock(raw_spinlock_t *lock)
{
spin_release(&lock->dep_map, RET_IP);
do_raw_spin_unlock(lock);
preempt_enable();
}
static inline void __raw_spin_lock(raw_spinlock_t *lock)
{
preempt_disable();
spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
}
(linux/include/linux/spinlock.h)
static inline void do_raw_spin_unlock(raw_spinlock_t *lock) __releases(lock)
{
mmiowb_spin_unlock();
arch_spin_unlock(&lock->raw_lock);
__release(lock);
}
static inline void do_raw_spin_lock(raw_spinlock_t *lock) __acquires(lock)
{
__acquire(lock);
arch_spin_lock(&lock->raw_lock);
mmiowb_spin_lock();
}
对于arch_spin_lock()、arch_spin_unlock()的底层实现,不同的kernel版本也一直在变化
kernel 4.4
对于kernel4.4这个版本,还是比较好理解的,最核心的也就是ldaxr、ldaxr独占指令 ,以及stlrh release指令
(linux/arch/arm64/include/asm/spinlock.h)
static inline void arch_spin_lock(arch_spinlock_t *lock)
{
unsigned int tmp;
arch_spinlock_t lockval, newval;
asm volatile(
/ Atomically increment the next ticket. /
ARM64_LSE_ATOMIC_INSN(
/ LL/SC /
" prfm pstl1strm, %3\n"
“1: ldaxr %w0, %3\n”
" add %w1, %w0, %w5\n"
" stxr %w2, %w1, %3\n"
" cbnz %w2, 1b\n",
/ LSE atomics /
" mov %w2, %w5\n"
" ldadda %w2, %w0, %3\n"
" nop\n"
" nop\n"
" nop\n"
)
/ Did we get the lock? /
" eor %w1, %w0, %w0, ror #16\n"
" cbz %w1, 3f\n"
/*
- No: spin on the owner. Send a local event to avoid missing an
- unlock before the exclusive load.
*/
" sevl\n"
“2: wfe\n”
" ldaxrh %w2, %4\n"
" eor %w1, %w2, %w0, lsr #16\n"
" cbnz %w1, 2b\n"
/ We got the lock. Critical section starts here. /
“3:”
: "=&r" (lockval), "=&r" (newval), "=&r" (tmp), "+Q" (*lock)
: "Q" (lock->owner), "I" (1 << TICKET_SHIFT)
: "memory");
}
static inline void arch_spin_unlock(arch_spinlock_t *lock)
{
unsigned long tmp;
asm volatile(ARM64_LSE_ATOMIC_INSN(
/ LL/SC /
" ldrh %w1, %0\n"
" add %w1, %w1, #1\n"
" stlrh %w1, %0",
/ LSE atomics /
" mov %w1, #1\n"
" nop\n"
" staddlh %w1, %0")
: "=Q" (lock->owner), "=&r" (tmp)
:
: "memory");
}
kernel 5.10
而对于kernel5.10上的实现,就复杂的多
(linux/include/asm-generic/qspinlock.h)
define arch_spin_is_locked(l) queued_spin_is_locked(l)
define arch_spin_is_contended(l) queued_spin_is_contended(l)
define arch_spin_value_unlocked(l) queued_spin_value_unlocked(l)
define arch_spin_lock(l) queued_spin_lock(l)
define arch_spin_trylock(l) queued_spin_trylock(l)
define arch_spin_unlock(l) queued_spin_unlock(l)
queued_spin_lock等的实现在linux/kernel/locking/qspinlock.c,不是一般的复杂,我们就不贴代码分析了。总之底层用到了ldxr、stxr等exclusive独占的指令。
思考
————————————————
(1)、spin_lock中,会什么要禁止抢占(preempt_disable)?
(以单核为例)
P1 holds the lock and after a time is scheduled out.
Now P2 starts executing and let’s say requests the same spinlock. Since P1 has not released the spinlock, P2 must spin and do nothing. So the execution time of P2 is wasted. It makes more sense to let P1 release the lock and then allow it to be preempted.
Also since there is no other processor, simply by disallowing preemption we know that there will never be another process that runs in parallel on another processor and accesses the same critical section.
也就是说, process1正在持有该锁,此时发生了schedule后process2又去试图拿该锁,process2就会自旋在那里,时间就浪费了. 合理的做法应该是,让Process1(执行完临界区)释放该锁
————————————————
(2)、spinlock的临界区为什么不允许sleep(使用schedule类函数)?
Thread A调用spin_lock进去临界区,此时该cpu已经禁止抢占了(preempt_disable),如果此时调用sleep主动schedule出去后,该cpu就永远回不来了因为禁止抢占了。这样的话,如果threadB再试图获取该锁时,就会发生死锁
————————————————
(3)、假设只有一个cpu,如果把spin_lock中的preempt_disable注释掉, 即允许抢占。 那么使用spin_lock会产生死锁吗?
不会。threadA在执行时临界区时,被schedule出去了,thread B试图获取该锁,threadB会自旋那里(比较浪费cpu资源),等到再次被调度到threadA并且释放了该锁后,threadB才可以继续往下跑。
————————————————
(4)、在中断中可以使用spin_lock吗?
如果支持中断嵌套的话,中断1执行spin_lock进入临界区后,中断2抢占了中断1,中断而试图spin_lock时将会拿不到锁,将会卡在那里,造成死锁。 所以在支持中断嵌套的系统中,应当使用spin_lock_irq和spin_unlock_irq。
哈哈,非常遗憾的告诉您,目前的Linux Kernel系统是不支持中断嵌套的哦,所以在您的系统的中断处理函数中,你用啥(spin_lock、spin_lock_irq、或都不用)效果都是一样的。
3、信号量
特点:
允许多个进程同时进入临界区,但大多数情况下只允许一个,即技术设置为1,这种又称之为二值信号量(互斥信号量)
信号量适合保护较长的临界区,因为竞争信号量时进程可睡眠,进程切换的代价很高。
(linux/include/linux/semaphore.h)
/ Please don’t access any members of this structure directly /
struct semaphore {
raw_spinlock_t lock;
unsigned int count;
struct list_head wait_list;
};
具体定义如下所示:
静态初始化信号量
define __SEMAPHORE_INITIALIZER(name, n) //指定名称和计数,允许n个进程同时进入临界区
define DEFINE_SEMAPHORE(name) //初始化一个互斥信号量
动态初始化信号量
static inline void sema_init(struct semaphore *sem, int val) // 运行时动态初始化信号量
void down(struct semaphore *sem); //获取信号量,如果值为0,则进入深度睡眠
int __must_check down_interruptible(struct semaphore *sem); //获取信号量,如果值为0,则进入轻度睡眠
int __must_check down_killable(struct semaphore *sem); //获取信号量,如果值为0,则进入中度睡眠
int __must_check down_trylock(struct semaphore *sem); //获取信号量,如果值为0,则进程不等待
int __must_check down_timeout(struct semaphore *sem, long jiffies); //获取信号量,指定等待时间
void up(struct semaphore *sem); //释放信号量
具体实现如下所示:
down() —> __down() —>__down_common()
down :
如果信号量值大于1,则给指减去1 (当然这里操作count的时候受spin_lock的保护)
如果值不大于1(即等于0),则进入睡眠等待,进入睡眠等待的流程如下:
(1)将当前进程task加入到waiter_list中
(2)调用schedule_timeout(timeout)将cpu调度出去
(linux/kernel/locking/semaphore.c)
void down(struct semaphore *sem)
{
unsigned long flags;
raw_spin_lock_irqsave(&sem->lock, flags);
if (likely(sem->count > 0))
sem->count--;
else
__down(sem);
raw_spin_unlock_irqrestore(&sem->lock, flags);
}
EXPORT_SYMBOL(down);
static noinline void __sched __down(struct semaphore *sem)
{
__down_common(sem, TASK_UNINTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT);
}
static inline int __sched __down_common(struct semaphore *sem, long state,
long timeout)
{
struct semaphore_waiter waiter;
list_add_tail(&waiter.list, &sem->wait_list);
waiter.task = current;
waiter.up = false;
for (;;) {
if (signal_pending_state(state, current))
goto interrupted;
if (unlikely(timeout <= 0))
goto timed_out;
__set_current_state(state);
raw_spin_unlock_irq(&sem->lock);
timeout = schedule_timeout(timeout);
raw_spin_lock_irq(&sem->lock);
if (waiter.up)
return 0;
}
timed_out:
list_del(&waiter.list);
return -ETIME;
interrupted:
list_del(&waiter.list);
return -EINTR;
}
up() —> __up()
up:
如果等待的进程列表(waiter_list)为空,则给信号量值加1
如果等待的进程列表(waiter_list)不为空,则从队列中唤醒一个等待的进程wake_up_process(waiter->task)
(linux/kernel/locking/semaphore.c)
void up(struct semaphore *sem)
{
unsigned long flags;
raw_spin_lock_irqsave(&sem->lock, flags);
if (likely(list_empty(&sem->wait_list)))
sem->count++;
else
__up(sem);
raw_spin_unlock_irqrestore(&sem->lock, flags);
}
EXPORT_SYMBOL(up);
static noinline void __sched __up(struct semaphore *sem)
{
struct semaphore_waiter *waiter = list_first_entry(&sem->wait_list,
struct semaphore_waiter, list);
list_del(&waiter->list);
waiter->up = true;
wake_up_process(waiter->task);
}
4、互斥锁
特点:
只允许1个进程同时进入临界区,适合保护较长的临界区,因为竞争互斥锁时进程可睡眠,进程切换的代价很高。
二值信号量(互斥信号量) 等价于 互斥锁
(linux/include/linux/mutex.h)
struct mutex {
atomic_long_t owner;
spinlock_t wait_lock;
ifdef CONFIG_MUTEX_SPIN_ON_OWNER
struct optimistic_spin_queue osq; / Spinner MCS lock /
endif
struct list_head wait_list;
ifdef CONFIG_DEBUG_MUTEXES
void *magic;
endif
ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
endif
};
具体定义如下所示:
DEFINE_MUTEX(mutexname) //静态初始化互斥锁
mutex_init(mutex) //动态初始化互斥锁
void mutex_lock(struct mutex *lock); // 申请互斥锁,如果该锁被占有则进入深度睡眠
int __must_check mutex_lock_interruptible(struct mutex *lock); // 申请互斥锁,如果该锁被占有则进入轻度睡眠
int __must_check mutex_lock_killable(struct mutex *lock); // 申请互斥锁,如果该锁被占有则进入中度睡眠
void mutex_lock_io(struct mutex *lock);
int mutex_trylock(struct mutex *lock); // 申请互斥锁,如果该锁被占有,则不等待,进程返回
void mutex_unlock(struct mutex *lock); // 释放互斥锁
具体实现如下所示:
mutex_lock() —> __mutex_lock_slowpath() —>__mutex_lock() ---->__mutex_lock_common()
(linux/kernel/locking/mutex.c)
void __sched mutex_lock(struct mutex *lock)
{
might_sleep();
if (!__mutex_trylock_fast(lock))
__mutex_lock_slowpath(lock);
}
EXPORT_SYMBOL(mutex_lock);
static noinline void __sched __mutex_lock_slowpath(struct mutex *lock)
{
__mutex_lock(lock, TASK_UNINTERRUPTIBLE, 0, NULL, _RET_IP_);
}
static int __sched __mutex_lock(struct mutex *lock, long state, unsigned int subclass,
struct lockdep_map *nest_lock, unsigned long ip)
{
return __mutex_lock_common(lock, state, subclass, nest_lock, ip, NULL, false);
}
static __always_inline int __sched __mutex_lock_common(struct mutex *lock, long state, unsigned int subclass,
struct lockdep_map *nest_lock, unsigned long ip,
struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx)
{
struct mutex_waiter waiter;
bool first = false;
struct ww_mutex *ww;
int ret;
if (!use_ww_ctx)
ww_ctx = NULL;
might_sleep();
ifdef CONFIG_DEBUG_MUTEXES
DEBUG_LOCKS_WARN_ON(lock->magic != lock);
endif
ww = container_of(lock, struct ww_mutex, base);
if (ww_ctx) {
if (unlikely(ww_ctx == READ_ONCE(ww->ctx)))
return -EALREADY;
/*
* Reset the wounded flag after a kill. No other process can
* race and wound us here since they can't have a valid owner
* pointer if we don't have any locks held.
*/
if (ww_ctx->acquired == 0)
ww_ctx->wounded = 0;
}
preempt_disable();
mutex_acquire_nest(&lock->dep_map, subclass, 0, nest_lock, ip);
if (__mutex_trylock(lock) ||
mutex_optimistic_spin(lock, ww_ctx, NULL)) {
/* got the lock, yay! */
lock_acquired(&lock->dep_map, ip);
if (ww_ctx)
ww_mutex_set_context_fastpath(ww, ww_ctx);
preempt_enable();
return 0;
}
spin_lock(&lock->wait_lock);
/*
- After waiting to acquire the wait_lock, try again.
*/
if (__mutex_trylock(lock)) {
if (ww_ctx)
__ww_mutex_check_waiters(lock, ww_ctx);
goto skip_wait;
}
debug_mutex_lock_common(lock, &waiter);
lock_contended(&lock->dep_map, ip);
if (!use_ww_ctx) {
/* add waiting tasks to the end of the waitqueue (FIFO): */
__mutex_add_waiter(lock, &waiter, &lock->wait_list);
ifdef CONFIG_DEBUG_MUTEXES
waiter.ww_ctx = MUTEX_POISON_WW_CTX;
endif
} else {
/*
* Add in stamp order, waking up waiters that must kill
* themselves.
*/
ret = __ww_mutex_add_waiter(&waiter, lock, ww_ctx);
if (ret)
goto err_early_kill;
waiter.ww_ctx = ww_ctx;
}
waiter.task = current;
set_current_state(state);
for (;😉 {
/*
* Once we hold wait_lock, we're serialized against
* mutex_unlock() handing the lock off to us, do a trylock
* before testing the error conditions to make sure we pick up
* the handoff.
*/
if (__mutex_trylock(lock))
goto acquired;
/*
* Check for signals and kill conditions while holding
* wait_lock. This ensures the lock cancellation is ordered
* against mutex_unlock() and wake-ups do not go missing.
*/
if (signal_pending_state(state, current)) {
ret = -EINTR;
goto err;
}
if (ww_ctx) {
ret = __ww_mutex_check_kill(lock, &waiter, ww_ctx);
if (ret)
goto err;
}
spin_unlock(&lock->wait_lock);
schedule_preempt_disabled();
/*
* ww_mutex needs to always recheck its position since its waiter
* list is not FIFO ordered.
*/
if (ww_ctx || !first) {
first = __mutex_waiter_is_first(lock, &waiter);
if (first)
__mutex_set_flag(lock, MUTEX_FLAG_HANDOFF);
}
set_current_state(state);
/*
* Here we order against unlock; we must either see it change
* state back to RUNNING and fall through the next schedule(),
* or we must see its unlock and acquire.
*/
if (__mutex_trylock(lock) ||
(first && mutex_optimistic_spin(lock, ww_ctx, &waiter)))
break;
spin_lock(&lock->wait_lock);
}
spin_lock(&lock->wait_lock);
acquired:
__set_current_state(TASK_RUNNING);
if (ww_ctx) {
/*
* Wound-Wait; we stole the lock (!first_waiter), check the
* waiters as anyone might want to wound us.
*/
if (!ww_ctx->is_wait_die &&
!__mutex_waiter_is_first(lock, &waiter))
__ww_mutex_check_waiters(lock, ww_ctx);
}
__mutex_remove_waiter(lock, &waiter);
debug_mutex_free_waiter(&waiter);
skip_wait:
/ got the lock - cleanup and rejoice! /
lock_acquired(&lock->dep_map, ip);
if (ww_ctx)
ww_mutex_lock_acquired(ww, ww_ctx);
spin_unlock(&lock->wait_lock);
preempt_enable();
return 0;
err:
__set_current_state(TASK_RUNNING);
__mutex_remove_waiter(lock, &waiter);
err_early_kill:
spin_unlock(&lock->wait_lock);
debug_mutex_free_waiter(&waiter);
mutex_release(&lock->dep_map, ip);
preempt_enable();
return ret;
}
5、当前处理器的互斥技术
禁止内核抢占
preempt_enable()、preempt_disable()
禁止软中断
local_bh_enable()、local_bh_disable()
禁止硬中断
local_irq_enable()、local_irq_disable()
添加威♥:sami01_2023,回复ARM中文,领取ARM中文手册