static inline void spin_lock(spinlock_t *lock)
{
raw_spin_lock(&lock->rlock);
}
/*
* Define the various spin_lock methods. Note we define these
* regardless of whether CONFIG_SMP or CONFIG_PREEMPT are set. The
* various methods are defined as nops in the case they are not
* required.
*/
#define raw_spin_trylock(lock) __cond_lock(lock, _raw_spin_trylock(lock))
//linux/include/linux/compiler.h
# define __cond_lock(x,c) (c)
//linux/include/linux/spinlock_api_up.h
#define _raw_spin_trylock(lock) ({ __LOCK(lock); 1; })
/*
* In the UP-nondebug case there's no real locking going on, so the
* only thing we have to do is to keep the preempt counts and irq
* flags straight, to suppress compiler warnings of unused lock
* variables, and to add the proper checker annotations:
*/
#define ___LOCK(lock) \
do { __acquire(lock); (void)(lock); } while (0)
//preempt_disable保证进程在临界区时不会被中断,来预防导致死锁发生
#define __LOCK(lock) \
do { preempt_disable(); ___LOCK(lock); } while (0)
//linux/include/linux/compiler.h
# define __acquire(x) __context__(x,1)
//这是一对用于sparse对代码检测的相互关联的函数定义,第一句表示要增加变量x的计数,增加量为1,第二句则正好相反,这个是用来函数编译的过程中。如果在代码中出现了不平衡的状况,那么在Sparse的检测中就会报警。
由上述通用代码追溯没有看到具体的实现,目前也不知道如何与不同平台架构的具体实现相关联。下面将分析内核代码中针对 ARM 平台arch_spin_lock的实现代码。
#include <thread>
#include <atomic>
#include <iostream>
#include <cstdio>
#include <mutex>
using namespace std;
using namespace this_thread;
using namespace chrono;
std::mutex my_mutex;
long total = 0;
static const int numthread = 50;
// 点击函数
void click()
{
my_mutex.lock();
for(int i=0; i<1000000;++i)
{
// 对全局数据进行加锁访问
total += 1;
}
my_mutex.unlock();
}
int main()
{
// 计时开始
clock_t start = clock();
// 创建100个线程模拟点击统计
std::thread mythread[numthread];
for (int i = 0; i < numthread;i++)
{
mythread[i] = std::thread(click);
}
for (int i = 0; i < numthread; i++)
{
mythread[i].join();
}
// 计时结束
clock_t finish = clock();
// 输出结果
cout<<"result:"<<total<<endl;
cout<<"duration:"<<finish -start<<"ms"<<endl;
return 0;
}
结果如下
result:50000000
duration:98213ms
➜ c++ ./a.out
result:50000000
duration:97476ms
➜ c++ ./a.out
result:50000000
duration:98720ms
➜ c++ ./a.out
result:50000000
duration:98079ms
不使用互斥锁结果如下,出错
➜ c++ ./a.out
result:6347619
duration:1140050ms
➜ c++ ./a.out
result:6945086
duration:1003280ms
➜ c++ ./a.out
result:4890718
duration:992402ms
➜ c++ ./a.out
result:7036019
duration:966157ms
可以看出c++使用互斥锁,比使用自旋锁或不使用锁,耗时少。
c中互斥锁的使用
编译命令cc xxx.c -lpthread
# include <stdio.h>
# include <pthread.h>
long total = 0;
static const int numthread = 50;
pthread_mutex_t mute;
// 点击函数
void *click(void *arg)
{
pthread_mutex_lock(&mute);
for(int i=0; i<1000000;++i)
{
// 对全局数据进行无锁访问
total += 1;
}
pthread_mutex_unlock(&mute);
}
int main()
{
// 计时开始
clock_t start = clock();
// 创建100个线程模拟点击统计
pthread_t mythreads[numthread];
int thread_id[numthread];
for(int i=0; i<numthread; ++i){
thread_id[i] = i;
pthread_create(&mythreads[i], NULL, click, (void *)&thread_id[i]);
}
for (int i = 0; i < numthread; i++)
{
int rc = pthread_join(mythreads[i], NULL);
}
// 计时结束
clock_t finish = clock();
// 输出结果
printf("result:%ld\n",total);
printf("result:%ld\n",finish -start);
return 0;
}
结果如下
result:50000000
result:99046
➜ c ./a.out
result:50000000
result:100372
➜ c ./a.out
result:50000000
result:99839
不使用互斥锁时,结果如下,可以看出使用互斥锁后程序耗时更少。
result:5512872
result:981536
➜ c ./a.out
result:5164347
result:981757
➜ c ./a.out
result:6051030
result:963732
➜ c ./a.out
result:7926930
result:967352
static int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val,
ktime_t *abs_time, u32 bitset)
{
struct hrtimer_sleeper timeout, *to;
struct restart_block *restart;
struct futex_hash_bucket *hb;
struct futex_q q = futex_q_init;
int ret;
if (!bitset)
return -EINVAL;
q.bitset = bitset;
//设置定时任务,如果一定时间后进程还没被唤醒这唤醒wait的线程
to = futex_setup_timer(abs_time, &timeout, flags,
current->timer_slack_ns);
retry:
/*
* Prepare to wait on uaddr. On success, holds hb lock and increments
* q.key refs.
*/
//该函数中,将用户空间值写入内核空间,若成功,返回0,以及一些初始化操作
ret = futex_wait_setup(uaddr, val, flags, &q, &hb);
if (ret)
goto out;
//将当前进程状态改为TASK_INTERRUPTIBLE,并插入到futex等待队列,然后重新调度。
/* queue_me and wait for wakeup, timeout, or a signal. */
futex_wait_queue_me(hb, &q, to);
/* If we were woken (and unqueued), we succeeded, whatever. */
ret = 0;
//如果unqueue_me成功,则说明是超时触发(因为futex_wake唤醒时,会将该进程移出等待队列,所以这里会失败
/* unqueue_me() drops q.key ref */
if (!unqueue_me(&q))
goto out;
ret = -ETIMEDOUT;
if (to && !to->task)
goto out;
/*
* We expect signal_pending(current), but we might be the
* victim of a spurious wakeup as well.
*/
if (!signal_pending(current))
goto retry;
ret = -ERESTARTSYS;
if (!abs_time)
goto out;
restart = ¤t->restart_block;
restart->fn = futex_wait_restart;
restart->futex.uaddr = uaddr;
restart->futex.val = val;
restart->futex.time = *abs_time;
restart->futex.bitset = bitset;
restart->futex.flags = flags | FLAGS_HAS_TIMEOUT;
ret = -ERESTART_RESTARTBLOCK;
out:
if (to) {
hrtimer_cancel(&to->timer);
destroy_hrtimer_on_stack(&to->timer);
}
return ret;
}
在将进程阻塞前会将当期进程插入到一个全局唯一的等待队列中。 着重看futex_wait_setup
/**
* futex_wait_setup() - Prepare to wait on a futex
* @uaddr: the futex userspace address
* @val: the expected value
* @flags: futex flags (FLAGS_SHARED, etc.)
* @q: the associated futex_q
* @hb: storage for hash_bucket pointer to be returned to caller
*
* Setup the futex_q and locate the hash_bucket. Get the futex value and
* compare it with the expected value. Handle atomic faults internally.
* Return with the hb lock held and a q.key reference on success, and unlocked
* with no q.key reference on failure.
*
* Return:
* - 0 - uaddr contains val and hb has been locked;
* - <1 - -EFAULT or -EWOULDBLOCK (uaddr does not contain val) and hb is unlocked
*/
static int futex_wait_setup(u32 __user *uaddr, u32 val, unsigned int flags,
struct futex_q *q, struct futex_hash_bucket **hb)
{
u32 uval;
int ret;
/*
* Access the page AFTER the hash-bucket is locked.
* Order is important:
*
* Userspace waiter: val = var; if (cond(val)) futex_wait(&var, val);
* Userspace waker: if (cond(var)) { var = new; futex_wake(&var); }
*
* The basic logical guarantee of a futex is that it blocks ONLY
* if cond(var) is known to be true at the time of blocking, for
* any cond. If we locked the hash-bucket after testing *uaddr, that
* would open a race condition where we could block indefinitely with
* cond(var) false, which would violate the guarantee.
*
* On the other hand, we insert q and release the hash-bucket only
* after testing *uaddr. This guarantees that futex_wait() will NOT
* absorb a wakeup if *uaddr does not match the desired values
* while the syscall executes.
*/
retry:
ret = get_futex_key(uaddr, flags & FLAGS_SHARED, &q->key, FUTEX_READ);
if (unlikely(ret != 0))
return ret;
retry_private:
//获得自旋锁
*hb = queue_lock(q);
//把用户空间的值放到内核空间中,即原子的将uaddr的值设置到uval中,成功返回0
ret = get_futex_value_locked(&uval, uaddr);
if (ret) {
queue_unlock(*hb);
ret = get_user(uval, uaddr);
if (ret)
return ret;
if (!(flags & FLAGS_SHARED))
goto retry_private;
goto retry;
}
//如果当期uaddr指向的值不等于val,即说明其他进程修改了
//uaddr指向的值,等待条件不再成立,不用阻塞直接返回
if (uval != val) {
//释放锁
queue_unlock(*hb);
ret = -EWOULDBLOCK;
}
return ret;
}
utex_wake(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset)
{
struct futex_hash_bucket *hb;
struct futex_q *this, *next;
union futex_key key = FUTEX_KEY_INIT;
int ret;
DEFINE_WAKE_Q(wake_q);
if (!bitset)
return -EINVAL;
//根据uaddr的值填充&key的内容
ret = get_futex_key(uaddr, flags & FLAGS_SHARED, &key, FUTEX_READ);
if (unlikely(ret != 0))
return ret;
//根据&key获得对应uaddr所在的futex_hash_bucket
hb = hash_futex(&key);
/* Make sure we really have tasks to wakeup */
if (!hb_waiters_pending(hb))
return ret;
spin_lock(&hb->lock);
//遍历该hb的链表,注意链表中存储的节点是plist_node类型,而而这里的this却是futex_q类型,这种类型转换是通过c中的container_of机制实现的
plist_for_each_entry_safe(this, next, &hb->chain, list) {
if (match_futex (&this->key, &key)) {
if (this->pi_state || this->rt_waiter) {
ret = -EINVAL;
break;
}
/* Check if one of the bits is set in both bitsets */
if (!(this->bitset & bitset))
continue;
mark_wake_futex(&wake_q, this);
if (++ret >= nr_wake)
break;
}
}
spin_unlock(&hb->lock);
//唤醒对应进程
wake_up_q(&wake_q);
return ret;
}