处理器如何实现原子操作
[TOC]
背景
临界区:每个进程中访问临界资源的那段程序称为临界区(临界资源是一次仅允许一个进程使用的共享资源)。每次只准许一个进程进入临界区,进入后不允许其他进程进入。
进程进入临界区的调度原则: ①如果有若干进程要求进入空闲的临界区,一次仅允许一个进程进入。 ②任何时候,处于临界区内的进程不可多于一个。如已有进程进入自己的临界区,则其它所有试图进入临界区的进程必须等待。 ③进入临界区的进程要在有限时间内退出,以便其它进程能及时进入自己的临界区。 ④如果进程不能进入自己的临界区,则应让出CPU,避免进程出现“忙等”现象。
锁主要保护的是临界资源,我们可以用一个状态位(bit)的状态来表示临界区是否空闲(,1:空闲;0:繁忙),当进程1读取到状态位的状态为1(空闲时),将此状态位状态设置为0(繁忙),并进入临界区处理临界资源,处理完后,将此状态位状态设置为1(空闲)。若在程序1进入临界区后,进程2也想进入临界区将要先判断状态位的状态,因为进程1在进入临界区前已经将状态位的值设置为0(繁忙),所以进程2就不能进入临界区,而是要等待。 有上可知,实现锁的前提是对状态位状态的读取设置操作是原子的,即进程1没有完成对状态位的读取-判断-设置这一系列操作之前,进程2不能对状态位的状态进行读取。
由处理器如何实现原子操作.这篇文章,可以知道操作系统有保证操作的原子性的机制(总线锁lock机制)。既然这样就可以实现原子性操作,比如对某个位进行读取设置的原子性操作。在c++中,提供了atomic原子库,提供了对一些常用数据类型进行原子操作的接口。
自旋锁
自旋锁是一种busy-waiting的锁,即进程如果申请不到锁,会一直不断地循环检查锁(状态位)是否可用(会消耗cpu时间),直到获取到这个锁为止。 优点:不会使进程状态发生切换,即进程一直处于active状态,不会进入阻塞状态,获得锁后,不用进行上下文切换,执行速度快。 缺点:没有获得锁前,会一直消耗cpu时间。
上下文切换:(也叫进程切换、任务切换)从正在运行的的进程中收回处理器,即把进程存放在处理器中的中间数据找个地方存起来。
C++实现自旋锁
c++标准库没有提供自旋锁,可以利用atomic_flag实现。 std::atomic_flag是一个原子的布尔类型,可支持两种原子操作: 1.test_and_set: 如果atomic_flag对象被设置,则返回true; 如果atomic_flag对象未被设置,则设置之,返回false 2.clear: 清楚atomic_flag对象
使用自旋锁代码
#include <thread>
#include <atomic>
#include <iostream>
#include <cstdio>
using namespace std;
class spin_lock {
private:
atomic_flag flag;
public:
spin_lock() = default;
spin_lock(const spin_lock&) = delete;
spin_lock& operator=(const spin_lock) = delete;
void lock() { //acquire spin lock
while (flag.test_and_set());
}
void unlock() { //release spin lock
flag.clear();
}
};
spin_lock splock;
long total = 0;
static const int numthread = 50;
// 点击函数
void click()
{
splock.lock();
for(int i=0; i<1000000;++i)
{
// 对全局数据进行加锁访问
total += 1;
}
splock.unlock();
}
int main()
{
// 计时开始
clock_t start = clock();
// 创建100个线程模拟点击统计
std::thread mythread[numthread];
for (int i = 0; i < numthread;i++)
{
mythread[i] = std::thread(click);
}
for (int i = 0; i < numthread; i++)
{
mythread[i].join();
}
// 计时结束
clock_t finish = clock();
// 输出结果
cout<<"result:"<<total<<endl;
cout<<"duration:"<<finish -start<<"ms"<<endl;
return 0;
}
运行结果如下,没有出错
jer@jer:/media/sf_share$ ./a.out
result:50000000
duration:3657599ms
jer@jer:/media/sf_share$ ^C
jer@jer:/media/sf_share$ ./a.out
result:50000000
duration:3015523ms
jer@jer:/media/sf_share$ ./a.out
result:50000000
duration:2723883ms
jer@jer:/media/sf_share$ ./a.out
result:50000000
duration:2923082ms
jer@jer:/media/sf_share$ ./a.out
result:50000000
duration:2546655ms
jer@jer:/media/sf_share$ ./a.out
result:50000000
duration:3308527ms
不使用自旋锁代码
下面是不使用自旋锁的代码
#include <thread>
#include <atomic>
#include <iostream>
#include <cstdio>
using namespace std;
class spin_lock {
private:
atomic_flag flag;
public:
spin_lock() = default;
spin_lock(const spin_lock&) = delete;
spin_lock& operator=(const spin_lock) = delete;
void lock() { //acquire spin lock
while (flag.test_and_set());
}
void unlock() { //release spin lock
flag.clear();
}
};
spin_lock splock;
long total = 0;
static const int numthread = 50;
// 点击函数
void click()
{
//splock.lock();
for(int i=0; i<1000000;++i)
{
// 对全局数据进行无锁访问
total += 1;
}
//splock.unlock();
}
int main()
{
// 计时开始
clock_t start = clock();
// 创建100个线程模拟点击统计
std::thread mythread[numthread];
for (int i = 0; i < numthread;i++)
{
mythread[i] = std::thread(click);
}
for (int i = 0; i < numthread; i++)
{
mythread[i].join();
}
// 计时结束
clock_t finish = clock();
// 输出结果
cout<<"result:"<<total<<endl;
cout<<"duration:"<<finish -start<<"ms"<<endl;
return 0;
}
结果如下,运行比加锁后时间少,但出错:
jer@jer:/media/sf_share$ ./a.out
result:12390595
duration:191067ms
jer@jer:/media/sf_share$ ./a.out
result:9231221
duration:192267ms
jer@jer:/media/sf_share$ ./a.out
result:5343771
duration:190760ms
jer@jer:/media/sf_share$ ./a.out
result:8376018
duration:199207ms
jer@jer:/media/sf_share$ ./a.out
result:17075471
duration:191034ms
jer@jer:/media/sf_share$ ./a.out
result:8323132
duration:190742ms
jer@jer:/media/sf_share$ ./a.out
result:2222865
duration:192213ms
注:如何把锁加在for循环里,运行非常久,原因未知,代码如下
void click()
{
for(int i=0; i<1000000;++i)
{
splock.lock();
// 对全局数据进行加锁访问
total += 1;
splock.unlock();
}
}
C实现自旋锁
linux内核有定义自旋锁的接口,定义在<linux/include/linux/spinlock.h>,但是没有系统调用自旋锁的入口( syscall() 执行一个系统调用,include/linux/syscalls.h文件中定义有所用系统调用函数的原型),在include/linux/syscalls.h没有找到有自旋锁的相关接口。所以在用c编写应用程序时,不能直接调用此函数。
linux自旋锁实现
链接: linux kernel源码镜像地址.
自旋锁的基本使用形式有两种:
spinlock_t lock = SPIN_LOCK_UNLOCKED;
spin_lock(&lock);
/* 临界区 */
spin_unlock(&lock);
或
DEFINE_SPINLOCK(mylock);
spin_lock(&mylock);
/* 临界区 */
spin_unlock(&mylock);
spinlock_t的定义在<linux/include/linux/spinlock_types.h>
typedef struct spinlock {
union {
struct raw_spinlock rlock;
#ifdef CONFIG_DEBUG_LOCK_ALLOC
# define LOCK_PADSIZE (offsetof(struct raw_spinlock, dep_map))
struct {
u8 __padding[LOCK_PADSIZE];
struct lockdep_map dep_map;
};
#endif
};
} spinlock_t;
typedef struct raw_spinlock {
arch_spinlock_t raw_lock;
#ifdef CONFIG_GENERIC_LOCKBREAK
unsigned int break_lock;
#endif
#ifdef CONFIG_DEBUG_SPINLOCK
unsigned int magic, owner_cpu;
void *owner;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
} raw_spinlock_t;
spin_lock()的定义在linux/include/linux/spinlock.h
static inline void spin_lock(spinlock_t *lock)
{
raw_spin_lock(&lock->rlock);
}
/*
* Define the various spin_lock methods. Note we define these
* regardless of whether CONFIG_SMP or CONFIG_PREEMPT are set. The
* various methods are defined as nops in the case they are not
* required.
*/
#define raw_spin_trylock(lock) __cond_lock(lock, _raw_spin_trylock(lock))
//linux/include/linux/compiler.h
# define __cond_lock(x,c) (c)
//linux/include/linux/spinlock_api_up.h
#define _raw_spin_trylock(lock) ({ __LOCK(lock); 1; })
/*
* In the UP-nondebug case there's no real locking going on, so the
* only thing we have to do is to keep the preempt counts and irq
* flags straight, to suppress compiler warnings of unused lock
* variables, and to add the proper checker annotations:
*/
#define ___LOCK(lock) \
do { __acquire(lock); (void)(lock); } while (0)
//preempt_disable保证进程在临界区时不会被中断,来预防导致死锁发生
#define __LOCK(lock) \
do { preempt_disable(); ___LOCK(lock); } while (0)
//linux/include/linux/compiler.h
# define __acquire(x) __context__(x,1)
//这是一对用于sparse对代码检测的相互关联的函数定义,第一句表示要增加变量x的计数,增加量为1,第二句则正好相反,这个是用来函数编译的过程中。如果在代码中出现了不平衡的状况,那么在Sparse的检测中就会报警。
由上述通用代码追溯没有看到具体的实现,目前也不知道如何与不同平台架构的具体实现相关联。下面将分析内核代码中针对 ARM 平台arch_spin_lock的实现代码。
ARM32 平台arch_spin_lock
路径: linux/arch/arm/include/asm/spinlock.h; linux/arch/arm/include/asm/spinlock_types.h;
typedef struct {
union { //此处为联合体
u32 slock;
struct __raw_tickets {
#ifdef __ARMEB__
u16 next;
u16 owner;
#else
u16 owner;
u16 next;
#endif
} tickets;
};
} arch_spinlock_t;
/*
* ARMv6 ticket-based spin-locking.
*
* A memory barrier is required after we get a lock, and before we
* release it, because V6 CPUs are assumed to have weakly ordered
* memory.
*/
static inline void arch_spin_lock(arch_spinlock_t *lock)
{
unsigned long tmp;
u32 newval;
arch_spinlock_t lockval;
//和preloading cache相关的操作,主要是为了性能考虑
prefetchw(&lock->slock);
__asm__ __volatile__(
//读取锁的值赋值给lockval ,lockval = lock->slock (如果lock->slock没有被其他处理器独占,则标记当前执行处理器对lock->slock地址的独占访问;否则不影响,此标记会在strex指令向内存写数据时用到)
"1: ldrex %0, [%3]\n"
//将next++之后的值存在newval中,newval = lockval + (1 << TICKET_SHIFT)
" add %1, %0, %4\n"
//strex tmp, newval, [&lock->slock] (如果当前执行处理器没有独占lock->slock地址的访问,不进行存储,返回1给temp;如果当前处理器已经独占lock->slock内存访问,则对内存进行写,返回0给temp,清除独占标记) lock->tickets.next = lock->tickets.next + 1
" strex %2, %1, [%3]\n"
//判断上条指令是否成功,如果不成功执行”bne 1b”跳到标号1执行,if(tmp == 0)
" teq %2, #0\n"
" bne 1b"
: "=&r" (lockval), "=&r" (newval), "=&r" (tmp)
: "r" (&lock->slock), "I" (1 << TICKET_SHIFT)
: "cc");
//当tickets中的next和owner不相等的时候,说明临界区在忙, 需要等待。然后cpu会执行wfe指令。当其他cpu忙完之后,会更新owner的值,如果owner的值如果与next值相同,那到next号的cpu执行。
while (lockval.tickets.next != lockval.tickets.owner) {
wfe();
lockval.tickets.owner = READ_ONCE(lock->tickets.owner);
}
smp_mb();
}
由上可知其实关键是指令ldrex和strex,他们保证了对某块内存进行读写的原子操作。
x86_64 平台arch_spin_lock
在linux 的main主线源码的/linux/arch/x86_64没看到有相关源码文件,不知x86平台spin lock的实现原理。
互斥锁
互斥锁:用于保护临界区,确保同一时间只有一个线程访问数据。对共享资源的访问,先对互斥量进行加锁,如果互斥量已经上锁,调用线程会阻塞,直到互斥量被解锁。在完成了对共享资源的访问后,要对互斥量进行解锁。 相比自旋锁,互斥锁会在等待期间放弃cpu。
c++中互斥锁的使用
编译命令g++ mutex_lock.cpp -lpthread
#include <thread>
#include <atomic>
#include <iostream>
#include <cstdio>
#include <mutex>
using namespace std;
using namespace this_thread;
using namespace chrono;
std::mutex my_mutex;
long total = 0;
static const int numthread = 50;
// 点击函数
void click()
{
my_mutex.lock();
for(int i=0; i<1000000;++i)
{
// 对全局数据进行加锁访问
total += 1;
}
my_mutex.unlock();
}
int main()
{
// 计时开始
clock_t start = clock();
// 创建100个线程模拟点击统计
std::thread mythread[numthread];
for (int i = 0; i < numthread;i++)
{
mythread[i] = std::thread(click);
}
for (int i = 0; i < numthread; i++)
{
mythread[i].join();
}
// 计时结束
clock_t finish = clock();
// 输出结果
cout<<"result:"<<total<<endl;
cout<<"duration:"<<finish -start<<"ms"<<endl;
return 0;
}
结果如下
result:50000000
duration:98213ms
➜ c++ ./a.out
result:50000000
duration:97476ms
➜ c++ ./a.out
result:50000000
duration:98720ms
➜ c++ ./a.out
result:50000000
duration:98079ms
不使用互斥锁结果如下,出错
➜ c++ ./a.out
result:6347619
duration:1140050ms
➜ c++ ./a.out
result:6945086
duration:1003280ms
➜ c++ ./a.out
result:4890718
duration:992402ms
➜ c++ ./a.out
result:7036019
duration:966157ms
可以看出c++使用互斥锁,比使用自旋锁或不使用锁,耗时少。
c中互斥锁的使用
编译命令cc xxx.c -lpthread
# include <stdio.h>
# include <pthread.h>
long total = 0;
static const int numthread = 50;
pthread_mutex_t mute;
// 点击函数
void *click(void *arg)
{
pthread_mutex_lock(&mute);
for(int i=0; i<1000000;++i)
{
// 对全局数据进行无锁访问
total += 1;
}
pthread_mutex_unlock(&mute);
}
int main()
{
// 计时开始
clock_t start = clock();
// 创建100个线程模拟点击统计
pthread_t mythreads[numthread];
int thread_id[numthread];
for(int i=0; i<numthread; ++i){
thread_id[i] = i;
pthread_create(&mythreads[i], NULL, click, (void *)&thread_id[i]);
}
for (int i = 0; i < numthread; i++)
{
int rc = pthread_join(mythreads[i], NULL);
}
// 计时结束
clock_t finish = clock();
// 输出结果
printf("result:%ld\n",total);
printf("result:%ld\n",finish -start);
return 0;
}
结果如下
result:50000000
result:99046
➜ c ./a.out
result:50000000
result:100372
➜ c ./a.out
result:50000000
result:99839
不使用互斥锁时,结果如下,可以看出使用互斥锁后程序耗时更少。
result:5512872
result:981536
➜ c ./a.out
result:5164347
result:981757
➜ c ./a.out
result:6051030
result:963732
➜ c ./a.out
result:7926930
result:967352
pthread_mutex_lock的实现原理
在Linux下,信号量和线程互斥锁的实现都是通过futex系统调用实现。 Futex 是一个提升效率的机制。在Unix系统中,传统的进程间同步机制都是通过对内核对象操作来完成的,这个内核对象在需要同步的进程中都是可见的,进程间的同步是通过系统调用在内核中完成。这种同步方式因为涉及用户态和内核态的切换,效率比较低。而且只要使用了传统的同步机制,进入临界区时即使没有其他的进程竞争也必须切换到内核态来检查内核同步对象的状态,这种不必要的切换显然带来了大量的浪费。 Futex就是为了解决这个问题而诞生的。Futex是一种用户态和内核态混合的同步机制,使用Futex同步机制,如果用于进程间同步,需要先创建一块共享内存,Futex变量就位于共享区。同时对Futex变量的操作必须是原子的,当进程试图进入临界区或者退出临界区的时候,首先检查共享内存中的Futex变量,如果没有其他的进程也申请使用临界区,则只修改Futex变量而不再执行系统调用。如果同时有其他进程在申请使用临界区,还是需要通过系统调用去执行等待或唤醒操作。这样通过用户态的Futex变量的控制,减少了进程在用户态和内核态之间切换的次数,从而减少了系统同步的开销。
pthread_mutex_lock的原理如下
pthread_mutex_lock:
atomic_dec(pthread_mutex_t.value);
if(pthread_mutex_t.value!=0)
futex(WAIT)
else
success
pthread_mutex_unlock:
atomic_inc(pthread_mutex_t.value);
if(pthread_mutex_t.value!=1)
futex(WAKEUP)
else
success
在用户空间的共享变量其实在使用mutex时定义的,pthread_mutex_t类型的变量里。 futex源码在/linux/kernel/futex.c中,主要是futex_wait和futex_wake两个函数
futex_wait
static int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val,
ktime_t *abs_time, u32 bitset)
{
struct hrtimer_sleeper timeout, *to;
struct restart_block *restart;
struct futex_hash_bucket *hb;
struct futex_q q = futex_q_init;
int ret;
if (!bitset)
return -EINVAL;
q.bitset = bitset;
//设置定时任务,如果一定时间后进程还没被唤醒这唤醒wait的线程
to = futex_setup_timer(abs_time, &timeout, flags,
current->timer_slack_ns);
retry:
/*
* Prepare to wait on uaddr. On success, holds hb lock and increments
* q.key refs.
*/
//该函数中,将用户空间值写入内核空间,若成功,返回0,以及一些初始化操作
ret = futex_wait_setup(uaddr, val, flags, &q, &hb);
if (ret)
goto out;
//将当前进程状态改为TASK_INTERRUPTIBLE,并插入到futex等待队列,然后重新调度。
/* queue_me and wait for wakeup, timeout, or a signal. */
futex_wait_queue_me(hb, &q, to);
/* If we were woken (and unqueued), we succeeded, whatever. */
ret = 0;
//如果unqueue_me成功,则说明是超时触发(因为futex_wake唤醒时,会将该进程移出等待队列,所以这里会失败
/* unqueue_me() drops q.key ref */
if (!unqueue_me(&q))
goto out;
ret = -ETIMEDOUT;
if (to && !to->task)
goto out;
/*
* We expect signal_pending(current), but we might be the
* victim of a spurious wakeup as well.
*/
if (!signal_pending(current))
goto retry;
ret = -ERESTARTSYS;
if (!abs_time)
goto out;
restart = ¤t->restart_block;
restart->fn = futex_wait_restart;
restart->futex.uaddr = uaddr;
restart->futex.val = val;
restart->futex.time = *abs_time;
restart->futex.bitset = bitset;
restart->futex.flags = flags | FLAGS_HAS_TIMEOUT;
ret = -ERESTART_RESTARTBLOCK;
out:
if (to) {
hrtimer_cancel(&to->timer);
destroy_hrtimer_on_stack(&to->timer);
}
return ret;
}
在将进程阻塞前会将当期进程插入到一个全局唯一的等待队列中。 着重看futex_wait_setup
/**
* futex_wait_setup() - Prepare to wait on a futex
* @uaddr: the futex userspace address
* @val: the expected value
* @flags: futex flags (FLAGS_SHARED, etc.)
* @q: the associated futex_q
* @hb: storage for hash_bucket pointer to be returned to caller
*
* Setup the futex_q and locate the hash_bucket. Get the futex value and
* compare it with the expected value. Handle atomic faults internally.
* Return with the hb lock held and a q.key reference on success, and unlocked
* with no q.key reference on failure.
*
* Return:
* - 0 - uaddr contains val and hb has been locked;
* - <1 - -EFAULT or -EWOULDBLOCK (uaddr does not contain val) and hb is unlocked
*/
static int futex_wait_setup(u32 __user *uaddr, u32 val, unsigned int flags,
struct futex_q *q, struct futex_hash_bucket **hb)
{
u32 uval;
int ret;
/*
* Access the page AFTER the hash-bucket is locked.
* Order is important:
*
* Userspace waiter: val = var; if (cond(val)) futex_wait(&var, val);
* Userspace waker: if (cond(var)) { var = new; futex_wake(&var); }
*
* The basic logical guarantee of a futex is that it blocks ONLY
* if cond(var) is known to be true at the time of blocking, for
* any cond. If we locked the hash-bucket after testing *uaddr, that
* would open a race condition where we could block indefinitely with
* cond(var) false, which would violate the guarantee.
*
* On the other hand, we insert q and release the hash-bucket only
* after testing *uaddr. This guarantees that futex_wait() will NOT
* absorb a wakeup if *uaddr does not match the desired values
* while the syscall executes.
*/
retry:
ret = get_futex_key(uaddr, flags & FLAGS_SHARED, &q->key, FUTEX_READ);
if (unlikely(ret != 0))
return ret;
retry_private:
//获得自旋锁
*hb = queue_lock(q);
//把用户空间的值放到内核空间中,即原子的将uaddr的值设置到uval中,成功返回0
ret = get_futex_value_locked(&uval, uaddr);
if (ret) {
queue_unlock(*hb);
ret = get_user(uval, uaddr);
if (ret)
return ret;
if (!(flags & FLAGS_SHARED))
goto retry_private;
goto retry;
}
//如果当期uaddr指向的值不等于val,即说明其他进程修改了
//uaddr指向的值,等待条件不再成立,不用阻塞直接返回
if (uval != val) {
//释放锁
queue_unlock(*hb);
ret = -EWOULDBLOCK;
}
return ret;
}
函数futex_wait_setup中主要做了两件事,一是获得自旋锁,二是判断*uaddr是否为预期值。
futex_wait流程:
加自旋锁
检测*uaddr是否等于val,如果不相等则会立即返回
将进程状态设置为TASK_INTERRUPTIBLE
将当期进程插入到等待队列中
释放自旋锁
创建定时任务:当超过一定时间还没被唤醒时,将进程唤醒
挂起当前进程
futex_wake
utex_wake(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset)
{
struct futex_hash_bucket *hb;
struct futex_q *this, *next;
union futex_key key = FUTEX_KEY_INIT;
int ret;
DEFINE_WAKE_Q(wake_q);
if (!bitset)
return -EINVAL;
//根据uaddr的值填充&key的内容
ret = get_futex_key(uaddr, flags & FLAGS_SHARED, &key, FUTEX_READ);
if (unlikely(ret != 0))
return ret;
//根据&key获得对应uaddr所在的futex_hash_bucket
hb = hash_futex(&key);
/* Make sure we really have tasks to wakeup */
if (!hb_waiters_pending(hb))
return ret;
spin_lock(&hb->lock);
//遍历该hb的链表,注意链表中存储的节点是plist_node类型,而而这里的this却是futex_q类型,这种类型转换是通过c中的container_of机制实现的
plist_for_each_entry_safe(this, next, &hb->chain, list) {
if (match_futex (&this->key, &key)) {
if (this->pi_state || this->rt_waiter) {
ret = -EINVAL;
break;
}
/* Check if one of the bits is set in both bitsets */
if (!(this->bitset & bitset))
continue;
mark_wake_futex(&wake_q, this);
if (++ret >= nr_wake)
break;
}
}
spin_unlock(&hb->lock);
//唤醒对应进程
wake_up_q(&wake_q);
return ret;
}
futex_wake流程如下:
找到uaddr对应的futex_hash_bucket,即代码中的hb
对hb加自旋锁
遍历fb的链表,找到uaddr对应的节点
调用wake_futex唤起等待的进程
释放自旋锁
Last updated
Was this helpful?