开源鸿蒙内核源码分析系列 | 读写锁 | 内核如何实现多读单写(转载)
特点&场景
读写锁 :是计算机程序的并发控制的一种同步机制,也称“共享-互斥锁”、多读者-单写者锁。读操作可并发重入,写操作是互斥的。鸿蒙实现的读写锁有几个特点:
- 一把锁分成 读/写 两种操作方式,读操作和写操作本身是互斥的,待操作任务按优先级存在两个独立的链表中,是读模式还是写模式,全凭任务的优先级而定,谁高就切到哪种模式。
- 一旦切到读模式,待读链表中优先级高于待写链表中最高优先级的任务们可以同时进行读操作,这些任务并行完成后便切到写模式。
- 一旦切到写模式,待写链表中优先级高于待读链表中最高优先级的任务们不可以同时进行写操作,只能一个个执行,这些任务串行完成后便切到读模式。
- 模式可以被高优先级任务打断,例如写模式时有高优先级的待读任务到来时,将切到读模式,反之亦然。
在实际很多业务场景中读写操作的频率是不同的,读往往高几个数量级,因读操作并不改变业务数据结构,所以读锁也称为共享锁。写操作会改变数据结构,数据之间须同步,修改注定是排他的,所以也称为排它锁/互斥锁,读写锁很好的解决了这种读写不对称的业务场景。
本篇详细解剖鸿蒙是如何实现读写锁的,代码部分均有注释,尤其释放锁部分的实现很精彩,让人有种酣畅淋漓的感觉。
开源鸿蒙实现
OsRwlock
系列篇多次说过,内核相对独立的功能 ,都有一个核心结构体 ,是一把吃透该功能的钥匙, 读写锁便是 LosRwlock。
typedef struct OsRwlock {//读写锁
INT32 magic:24; /**< Magic number | 魔法数字*/
INT32 rwCount:8; /**< Times of locking the rwlock, rwCount > 0 when rwkick is read mode, rwCount < 0
when the rwlock is write mode, rwCount = 0 when the lock is free.
大于0时为读模式 , 小于0时为写模式 等于0为自由模式
rwCount为读锁和写锁的数量,注意它们并不是此消彼长的行为模式*/
VOID *writeOwner; /**< The current write thread that is locking the rwlock | 拥有写权限的任务*/
LOS_DL_LIST readList; /**< Read waiting list | 等待读操作的任务链表*/
LOS_DL_LIST writeList; /**< Write waiting list | 等待写操作的任务链表*/
} LosRwlock;
解读:
- 魔法数字不会陌生,很多内核模块中都会使用到 ,比如栈顶就会有 ,以此判断栈是否溢出 (值被修改表示溢出)
- rwCount记录读或者写时的任务数量,读写锁分五种操作模式:
enum RwlockMode {
RWLOCK_NONE_MODE, ///< 自由模式: 读写链表都没有内容
RWLOCK_READ_MODE, ///< 读模式: 读链表有数据,写链表没有数据
RWLOCK_WRITE_MODE, ///< 写模式: 写链表有数据,读链表没有数据
RWLOCK_READFIRST_MODE, ///< 读优先模式: 读链表中的任务最高优先级高于写链表中任务最高优先级
RWLOCK_WRITEFIRST_MODE ///< 写优先模式: 写链表中的任务最高优先级高于读链表中任务最高优先级
};
- writeOwner 持有写权限的任务 ,写操作是互斥操作 ,这里记录下最后执行写操作的任务
- readList 所有等待读操作的任务都会挂到这里 ,并按优先级排好序
- writeList 所有等待写操作的任务都会挂到这里 ,并按优先级排好序
初始化读写锁
UINT32 LOS_RwlockInit(LosRwlock *rwlock)
{
UINT32 intSave;
if (rwlock == NULL) {
return LOS_EINVAL;
}
SCHEDULER_LOCK(intSave);
if ((rwlock->magic & RWLOCK_COUNT_MASK) == OS_RWLOCK_MAGIC) {
SCHEDULER_UNLOCK(intSave);
return LOS_EPERM;
}
rwlock->rwCount = 0;
rwlock->writeOwner = NULL;
LOS_ListInit(&(rwlock->readList));
LOS_ListInit(&(rwlock->writeList));
rwlock->magic = OS_RWLOCK_MAGIC;
SCHEDULER_UNLOCK(intSave);
return LOS_OK;
}
优先级 | 找位置
不论读/写,都得按优先级排序,所以当一个申请任务到来时,通过OsSchedLockPendFindPos找到合适的位置将挂入链表的操作就很重要了。
/// 找到合适的位置 将当前任务插入到 读/写锁链表中
LOS_DL_LIST *OsSchedLockPendFindPos(const LosTaskCB *runTask, LOS_DL_LIST *lockList)
{
LOS_DL_LIST *node = NULL;
if (LOS_ListEmpty(lockList)) {
node = lockList;
} else {
LosTaskCB *pendedTask1 = OS_TCB_FROM_PENDLIST(LOS_DL_LIST_FIRST(lockList));//找到首任务
LosTaskCB *pendedTask2 = OS_TCB_FROM_PENDLIST(LOS_DL_LIST_LAST(lockList)); //找到尾任务
if (pendedTask1->priority > runTask->priority) { //首任务比当前任务优先级低,所以runTask可以排第一
node = lockList->pstNext;//下一个,注意priority越大 优先级越低
} else if (pendedTask2->priority <= runTask->priority) {//尾任务比当前任务优先级高,所以runTask排最后
node = lockList;
} else {//两头比较没结果,陷入中间比较
node = OsSchedLockPendFindPosSub(runTask, lockList);//进入子级循环找
}
}
return node;
}
/// 找到合适的位置 将当前任务插入到 读/写锁链表中
STATIC INLINE LOS_DL_LIST *OsSchedLockPendFindPosSub(const LosTaskCB *runTask, const LOS_DL_LIST *lockList)
{
LosTaskCB *pendedTask = NULL;
LOS_DL_LIST *node = NULL;
LOS_DL_LIST_FOR_EACH_ENTRY(pendedTask, lockList, LosTaskCB, pendList) {//遍历阻塞链表
if (pendedTask->priority < runTask->priority) {//当前优先级更小,位置不宜,继续
continue;
} else if (pendedTask->priority > runTask->priority) {//找到合适位置
node = &pendedTask->pendList;
break;
} else {//找到合适位置
node = pendedTask->pendList.pstNext;
break;
}
}
return node;
}
针对本篇开始的问题二,等锁新任务来临后由任务优先级决定在queueList中的位置,OsFutexInsertTasktoPendList。
///< 将快锁挂到任务的阻塞链表上
STATIC INT32 OsFutexInsertTasktoPendList(FutexNode **firstNode, FutexNode *node, const LosTaskCB *run)
{
LosTaskCB *taskHead = OS_TCB_FROM_PENDLIST(LOS_DL_LIST_FIRST(&((*firstNode)->pendList)));//获取阻塞链表首个任务
LOS_DL_LIST *queueList = &((*firstNode)->queueList);
FutexNode *tailNode = NULL;
LosTaskCB *taskTail = NULL;
if (run->priority < taskHead->priority) {//任务的优先级比较
/* The one with the highest priority is inserted at the top of the queue */
LOS_ListTailInsert(queueList, &(node->queueList));//查到queueList的尾部
OsFutexReplaceQueueListHeadNode(*firstNode, node);//同时交换futexList链表上的位置
*firstNode = node;
return LOS_OK;
}
//如果等锁链表上没有任务或者当前任务大于链表首个任务
if (LOS_ListEmpty(queueList) && (run->priority >= taskHead->priority)) {
/* Insert the next position in the queue with equal priority */
LOS_ListHeadInsert(queueList, &(node->queueList));//从头部插入当前任务,当前任务是要被挂起的
return LOS_OK;
}
tailNode = OS_FUTEX_FROM_QUEUELIST(LOS_DL_LIST_LAST(queueList));//获取尾部节点
taskTail = OS_TCB_FROM_PENDLIST(LOS_DL_LIST_FIRST(&(tailNode->pendList)));//获取阻塞任务的最后一个
if ((run->priority >= taskTail->priority) ||//当前任务优先级比最后一个更高,或者 ... 没看懂, 为啥要这样 ? @notethinking
((run->priority - taskHead->priority) > (taskTail->priority - run->priority))) {//跟最后一个比较优先级
return OsFutexInsertFindFormBackToFront(queueList, run, node);//从后往前插入
}
return OsFutexInsertFindFromFrontToBack(queueList, run, node);//否则从前往后插入
}
申请读锁 | OsRwlockRdPendOp
申请读模式下的锁,分三种情况:
- 若无人持有锁,读任务可获得锁。
- 若有人持有锁,读任务可获得锁,读取顺序按照任务优先级。
- 若有人(非自己)持有写模式下的锁,则当前任务无法获得锁,直到写模式下的锁释放。
LOS_RwlockRdLock
OsRwlockRdUnsafe
OsRwlockRdPendOp
STATIC UINT32 OsRwlockRdPendOp(LosTaskCB *runTask, LosRwlock *rwlock, UINT32 timeout)
{
UINT32 ret;
if (rwlock->rwCount >= 0) {//第一和第二种情况
if (OsRwlockPriCompare(runTask, &(rwlock->writeList))) {//读优先级低于写优先级,意思就是必须先写再读
if (rwlock->rwCount == INT8_MAX) {//读锁任务达到上限
return LOS_EINVAL;
}
rwlock->rwCount++;//拿读锁成功
return LOS_OK;
}
}
if (!timeout) {
return LOS_EINVAL;
}
if (!OsPreemptableInSched()) {//不可抢占时
return LOS_EDEADLK;
}
/* The current task is not allowed to obtain the write lock when it obtains the read lock.
| 当前任务在获得读锁时不允许获得写锁 */
if ((LosTaskCB *)(rwlock->writeOwner) == runTask) { //拥有写锁任务是否为当前任务
return LOS_EINVAL;
}
/*
* When the rwlock mode is write mode or the priority of the current read task
* is lower than the first pended write task, current read task will be pended.
| 当 rwlock 模式为写模式或当前读任务的优先级低于第一个挂起的写任务时,当前读任务将被挂起。
反正就是写锁任务优先
*/
LOS_DL_LIST *node = OsSchedLockPendFindPos(runTask, &(rwlock->readList));//找到要挂入的位置
//例如现有链表内任务优先级为 0 3 8 9 23 当前为 10 时, 返回的是 9 这个节点
ret = OsSchedTaskWait(node, timeout, TRUE);//从尾部插入读锁链表 由此变成了 0 3 8 9 10 23
if (ret == LOS_ERRNO_TSK_TIMEOUT) {
return LOS_ETIMEDOUT;
}
return ret;
}
申请写锁 | OsRwlockWrPendOp
申请写模式下的锁,分两种情况:
若该锁当前没有任务持有,或者持有该读模式下的锁的任务和申请该锁的任务为同一个任务,则申请成功,可立即获得写模式下的锁。
若该锁当前已经存在读模式下的锁,且读取任务优先级较高,则当前任务挂起,直到读模式下的锁释放。
STATIC UINT32 OsRwlockWrPendOp(LosTaskCB *runTask, LosRwlock *rwlock, UINT32 timeout)
{
UINT32 ret;
/* When the rwlock is free mode, current write task can obtain this rwlock.
| 若该锁当前没有任务持有,或者持有该读模式下的锁的任务和申请该锁的任务为同一个任务,则申请成功,可立即获得写模式下的锁。*/
if (rwlock->rwCount == 0) {
rwlock->rwCount = -1;
rwlock->writeOwner = (VOID *)runTask;//直接给当前进程锁
return LOS_OK;
}
/* Current write task can use one rwlock once again if the rwlock owner is it.
| 如果 rwlock 拥有者是当前写入任务,则它可以再次使用该锁。*/
if ((rwlock->rwCount < 0) && ((LosTaskCB *)(rwlock->writeOwner) == runTask)) {
if (rwlock->rwCount == INT8_MIN) {
return LOS_EINVAL;
}
rwlock->rwCount--;//注意再次拥有算是两把写锁了.
return LOS_OK;
}
if (!timeout) {
return LOS_EINVAL;
}
if (!OsPreemptableInSched()) {
return LOS_EDEADLK;
}
/*
* When the rwlock is read mode or other write task obtains this rwlock, current
* write task will be pended. | 当 rwlock 为读模式或其他写任务获得该 rwlock 时,当前的写任务将被挂起。直到读模式下的锁释放
*/
LOS_DL_LIST *node = OsSchedLockPendFindPos(runTask, &(rwlock->writeList));//找到要挂入的位置
ret = OsSchedTaskWait(node, timeout, TRUE);//从尾部插入写锁链表
if (ret == LOS_ERRNO_TSK_TIMEOUT) {
ret = LOS_ETIMEDOUT;
}
return ret;
}
释放读写锁 | OsRwlockPostOp
通常释放操作很简单 ,不会特书大书 ,但读写锁不一样,释放操作反而是精彩部分,因为释放就意味着等这把锁的任务有机会运行了,这些任务需要被唤醒 ,而读模式下能同时进行读操作,意味着内核要连续的唤醒等待读操作的任务 。所以出现了以下代码,站长认为这部分代码写的很精彩,必须要点赞 @note_good
/// 释放锁
STATIC UINT32 OsRwlockPostOp(LosRwlock *rwlock, BOOL *needSched)
{
UINT32 rwlockMode;
LosTaskCB *resumedTask = NULL;
UINT16 pendedWriteTaskPri;
rwlock->rwCount = 0;
rwlock->writeOwner = NULL;
rwlockMode = OsRwlockGetMode(&(rwlock->readList), &(rwlock->writeList));//先获取模式
if (rwlockMode == RWLOCK_NONE_MODE) {//自由模式则正常返回
return LOS_OK;
}
/* In this case, rwlock will wake the first pended write task. | 在这种情况下,rwlock 将唤醒第一个挂起的写任务。 */
if ((rwlockMode == RWLOCK_WRITE_MODE) || (rwlockMode == RWLOCK_WRITEFIRST_MODE)) {//如果当前是写模式 (有任务在等写锁涅)
resumedTask = OS_TCB_FROM_PENDLIST(LOS_DL_LIST_FIRST(&(rwlock->writeList)));//获取任务实体
rwlock->rwCount = -1;//直接干成-1,注意这里并不是 --
rwlock->writeOwner = (VOID *)resumedTask;//有锁了则唤醒等锁的任务(写模式)
OsSchedTaskWake(resumedTask);
if (needSched != NULL) {
*needSched = TRUE;
}
return LOS_OK;
}
/* In this case, rwlock will wake the valid pended read task. 在这种情况下,rwlock 将唤醒有效的挂起读取任务。 */
if (rwlockMode == RWLOCK_READFIRST_MODE) { //如果是读优先模式
pendedWriteTaskPri = OS_TCB_FROM_PENDLIST(LOS_DL_LIST_FIRST(&(rwlock->writeList)))->priority;//取出写锁任务的最高优先级
}
resumedTask = OS_TCB_FROM_PENDLIST(LOS_DL_LIST_FIRST(&(rwlock->readList)));//获取最高优先级读锁任务
rwlock->rwCount = 1; //直接干成1,因为是释放操作
OsSchedTaskWake(resumedTask);//有锁了则唤醒等锁的任务(读模式)
while (!LOS_ListEmpty(&(rwlock->readList))) {//遍历读链表,目的是要唤醒其他读模式的任务(优先级得要高于pendedWriteTaskPri才行)
resumedTask = OS_TCB_FROM_PENDLIST(LOS_DL_LIST_FIRST(&(rwlock->readList)));
if ((rwlockMode == RWLOCK_READFIRST_MODE) && (resumedTask->priority >= pendedWriteTaskPri)) {//低于写模式的优先级
break;//跳出循环
}
if (rwlock->rwCount == INT8_MAX) {
return EINVAL;
}
rwlock->rwCount++;//读锁任务数量增加
OsSchedTaskWake(resumedTask);//不断唤醒读锁任务,由此实现了允许多个读操作并发,因为在多核情况下resumedTask很大可能
//与当前任务并不在同一个核上运行, 此处非常的有意思,点赞! @note_good
}
if (needSched != NULL) {
*needSched = TRUE;
}
return LOS_OK;
}
解读:
因写模式为互斥操作,背后的含义就是只能唤醒一个任务运行,所以取出首个(最高优先级)任务后,将任务唤醒加入就绪队列 OsSchedTaskWake ,申请调度 。
if ((rwlockMode == RWLOCK_WRITE_MODE) || (rwlockMode == RWLOCK_WRITEFIRST_MODE)) {//如果当前是写模式 (有任务在等写锁涅)
resumedTask = OS_TCB_FROM_PENDLIST(LOS_DL_LIST_FIRST(&(rwlock->writeList)));//获取任务实体
rwlock->rwCount = -1;//直接干成-1,注意这里并不是 --
rwlock->writeOwner = (VOID *)resumedTask;//有锁了则唤醒等锁的任务(写模式)
OsSchedTaskWake(resumedTask);
if (needSched != NULL) {
*needSched = TRUE;
}
return LOS_OK;
}
而读模式为共享模式,读任务可以同时执行,这里的同时指的是并行,在多个CPU上同时执行读操作。能同时执行多少个取决于写链表上的优先级,所以会先拿到最高写任务优先级pendedWriteTaskPri用于连续唤醒读任务的优先级比较。
if (rwlockMode == RWLOCK_READFIRST_MODE) { //如果是读优先模式
pendedWriteTaskPri = OS_TCB_FROM_PENDLIST(LOS_DL_LIST_FIRST(&(rwlock->writeList)))->priority;//取出写锁任务的最高优先级
}
通过循环连续的将读任务加入就绪队列,这才是真正做到能同时进行读操作(共享锁)的代码 !!! 这段代码一定要点赞,建议反复理解,有种醍醐灌顶 ,豁然开朗的奇妙感觉。
while (!LOS_ListEmpty(&(rwlock->readList))) {//遍历读链表,目的是要唤醒其他读模式的任务(优先级得要高于pendedWriteTaskPri才行)
resumedTask = OS_TCB_FROM_PENDLIST(LOS_DL_LIST_FIRST(&(rwlock->readList)));
if ((rwlockMode == RWLOCK_READFIRST_MODE) && (resumedTask->priority >= pendedWriteTaskPri)) {//低于写模式的优先级
break;//跳出循环
}
if (rwlock->rwCount == INT8_MAX) {
return EINVAL;
}
rwlock->rwCount++;//读锁任务数量增加
OsSchedTaskWake(resumedTask);//不断唤醒读锁任务,由此实现了允许多个读操作并发,因为在多核情况下resumedTask很大可能
//与当前任务并不在同一个核上运行, 此处非常的有意思,点赞! @note_good
}
百文说内核 | 抓住主脉络
子曰:“诗三百,一言以蔽之,曰‘思无邪’。”——《论语》:为政篇。百文相当于摸出内核的肌肉和器官系统,让人开始丰满有立体感,因是直接从注释源码起步,在开源鸿蒙内核源码加注释过程中,每每有心得处就整理,慢慢形成了以下文章。内容立足源码,常以生活场景打比方尽可能多的将内核知识点置入某种场景,具有画面感,容易理解记忆。说别人能听得懂的话很重要! 百篇博客绝不是百度教条式的在说一堆诘屈聱牙的概念,那没什么意思。更希望让内核变得栩栩如生,倍感亲切.确实有难度,自不量力,但已经出发,回头已是不可能的了。
百万汉字注解内核目的是要看清楚其毛细血管,细胞结构,等于在拿放大镜看内核。内核并不神秘,带着问题去源码中找答案是很容易上瘾的,你会发现很多文章对一些问题的解读是错误的,或者说不深刻难以自圆其说,你会慢慢形成自己新的解读,而新的解读又会碰到新的问题,如此层层递进,滚滚向前,拿着放大镜根本不愿意放手。
与代码有bug需不断debug一样,文章和注解内容会存在不少错漏之处,请多包涵,但会反复修正,持续更新,v**.xx 代表文章序号和修改的次数,精雕细琢,言简意赅,力求打造精品内容。百篇博客系列思维导图结构如下:
根据上图的思维导图,我们未来将要和大家一一分享以上大部分关键技术点的博客文章。
百万汉字注解.精读内核源码
如果大家觉得看文章不过瘾,想直接撸代码的话,可以去下面四大码仓围观同步注释内核源码:
gitee仓:
https://gitee.com/weharmony/kernel_liteos_a_note
github仓 :
https://github.com/kuangyufei/kernel_liteos_a_note
codechina仓:
https://codechina.csdn.net/kuangyufei/kernel_liteos_a_note
coding仓:
https://weharmony.coding.net/public/harmony/kernel_liteos_a_note/git/files
写在最后
我们最近正带着大家玩嗨OpenHarmony。如果你有用OpenHarmony开发的好玩的东东,或者有对OpenHarmony的深度技术剖析,想通过我们平台让更多的小伙伴知道和分享的,欢迎投稿,让我们一起嗨起来!有点子,有想法,有Demo,立刻联系我们:
合作邮箱:zzliang@atomsource.org