基本概念
LiteIPC是OpenHarmony LiteOS-A内核提供的一种新型IPC(Inter-Process Communication,即进程间通信)机制,为轻量级进程间通信组件,为面向服务的系统服务框架提供进程间通信能力,分为内核实现和用户态实现两部分,其中内核实现完成进程间消息收发、IPC内存管理、超时通知和死亡通知等功能;用户态提供序列化和反序列化能力,并完成IPC回调消息和死亡消息的分发。
我们主要讲解内核态实现部分,本想一篇说完,但发现它远比想象中的复杂和重要,所以分通讯内容和通讯机制上下两篇说。上篇可翻看 鸿蒙内核源码分析(消息封装篇) | 剖析LiteIpc(上)进程通讯内容 ,本篇为通讯机制,介绍liteipc在内核层的实现过程。
空间映射
映射 一词在系列篇中多次出现,虚拟地址的基础就是映射,共享内存的实现也要靠映射,LiteIPC通讯的底层实现也离不开映射,有意思的是将用户态的线性区和内核态的线性区进行了映射。也就是说当用户访问用户空间中的某个虚拟地址时,其实和内核空间某个虚拟地址都指向了同一个物理内存地址。可能有人会问这也行 ? 在了解完物理地址,内核虚地址,用户虚地址之间的关系后就会明白这当然可以。
- 虚拟地址包括内核空间地址和用户进程空间地址,它们的范围对外暴露,由系统集成商设定,内核也专门提供了判断函数。即看到一个虚拟地址可以知道是内核在用还是用户(应用)进程在用。
/// 虚拟地址是否在内核空间
STATIC INLINE BOOL LOS_IsKernelAddress(VADDR_T vaddr)
{
return ((vaddr >= (VADDR_T)KERNEL_ASPACE_BASE) &&
(vaddr <= ((VADDR_T)KERNEL_ASPACE_BASE + ((VADDR_T)KERNEL_ASPACE_SIZE - 1))));
}
/// 虚拟地址是否在用户空间
STATIC INLINE BOOL LOS_IsUserAddress(VADDR_T vaddr)
{
return ((vaddr >= USER_ASPACE_BASE) &&
(vaddr <= (USER_ASPACE_BASE + (USER_ASPACE_SIZE - 1))));
}
- 物理地址是由物理内存提供,系统集成商根据实际的物理内存大小来设定地址范围。至于具体的某段物理内存是给内核空间还是给用户空间使用并没有要求,所谓映射是指虚拟地址 <--> 物理地址的映射,是 N:1的关系,一个物理地址可以被多个虚拟地址映射,而一个虚拟地址只能映射到一个物理地址。
- 以上是LiteIPC的实现概念基础,明白后就不难理解结构体IpcPool存在的意义了
/**
* @struct IpcPool | ipc池
* @brief LiteIPC的核心思想就是在内核态为每个Service任务维护一个IPC消息队列,该消息队列通过LiteIPC设备文件向上层
* 用户态程序分别提供代表收取IPC消息的读操作和代表发送IPC消息的写操作。
*/
typedef struct {
VOID *uvaddr;///< 用户态空间地址,由kvaddr映射而来的地址,这两个地址的关系一定要搞清楚,否则无法理解IPC的核心思想
VOID *kvaddr;///< 内核态空间地址,IPC申请的是内核空间,但是会通过 DoIpcMmap 将这个地址映射到用户空间
UINT32 poolSize; ///< ipc池大小
} IpcPool;
文件访问
LiteIPC的运行机制是首先将需要接收IPC消息的任务通过ServiceManager注册成为一个Service,然后通过ServiceManager为该Service任务配置访问权限,即指定哪些任务可以向该Service任务发送IPC消息。LiteIPC的核心思想就是在内核态为每个Service任务维护一个IPC消息队列,该消息队列通过LiteIPC设备文件向上层用户态程序分别提供代表收取IPC消息的读操作和代表发送IPC消息的写操作。 设备文件的接口层(VFS)实现为g_liteIpcFops,跟踪这几个函数就能够整明白整个实现LiteIPC过程
#define LITEIPC_DRIVER "/dev/lite_ipc"///< 虚拟设备,文件访问读取
STATIC const struct file_operations_vfs g_liteIpcFops = {
.open = LiteIpcOpen, /* open | 创建Ipc内存池*/
.close = LiteIpcClose, /* close */
.ioctl = LiteIpcIoctl, /* ioctl | 包含读写操作 */
.mmap = LiteIpcMmap, /* mmap | 实现线性区映射*/
}; LiteIpcOpen | 创建消息内存池
LITE_OS_SEC_TEXT STATIC int LiteIpcOpen(struct file *filep)
{
LosProcessCB *pcb = OsCurrProcessGet();
if (pcb->ipcInfo != NULL) {
return 0;
}
pcb->ipcInfo = LiteIpcPoolCreate();
if (pcb->ipcInfo == NULL) {
return -ENOMEM;
}
return 0;
}
///创建IPC消息内存池
LITE_OS_SEC_TEXT_INIT STATIC ProcIpcInfo *LiteIpcPoolCreate(VOID)
{
ProcIpcInfo *ipcInfo = LOS_MemAlloc(m_aucSysMem1, sizeof(ProcIpcInfo));//从内核堆内存中申请IPC控制体
if (ipcInfo == NULL) {
return NULL;
}
(VOID)memset_s(ipcInfo, sizeof(ProcIpcInfo), 0, sizeof(ProcIpcInfo));
(VOID)LiteIpcPoolInit(ipcInfo);
return ipcInfo;
} 解读
- 进来先获取当前进程OsCurrProcessGet(),即为每个进程创建唯一的IPC消息控制体,ProcIpcInfo在进程控制块中,负责管理IPC消息
- 初始化消息内存池,此处只申请结构体本身占用内存,真正的内存池在LiteIpcMmap中完成
LiteIpcMmap | 映射
///将参数线性区设为IPC专用区
LITE_OS_SEC_TEXT STATIC int LiteIpcMmap(struct file *filep, LosVmMapRegion *region)
{
int ret = 0;
LosVmMapRegion *regionTemp = NULL;
LosProcessCB *pcb = OsCurrProcessGet();
ProcIpcInfo *ipcInfo = pcb->ipcInfo;
//被映射的线性区不能在常量和私有数据区
if ((ipcInfo == NULL) || (region == NULL) || (region->range.size > LITE_IPC_POOL_MAX_SIZE) ||
(!LOS_IsRegionPermUserReadOnly(region)) || (!LOS_IsRegionFlagPrivateOnly(region))) {
ret = -EINVAL;
goto ERROR_REGION_OUT;
}
if (IsPoolMapped(ipcInfo)) {//已经用户空间和内核空间之间存在映射关系了
return -EEXIST;
}
if (ipcInfo->pool.uvaddr != NULL) {//ipc池已在进程空间有地址
regionTemp = LOS_RegionFind(pcb->vmSpace, (VADDR_T)(UINTPTR)ipcInfo->pool.uvaddr);//在指定进程空间中找到所在线性区
if (regionTemp != NULL) {
(VOID)LOS_RegionFree(pcb->vmSpace, regionTemp);//先释放线性区
}
// 建议加上 ipcInfo->pool.uvaddr = NULL; 同下
}
ipcInfo->pool.uvaddr = (VOID *)(UINTPTR)region->range.base;//将指定的线性区和ipc池虚拟地址绑定
if (ipcInfo->pool.kvaddr != NULL) {//如果存在内核空间地址
LOS_VFree(ipcInfo->pool.kvaddr);//因为要重新映射,所以必须先释放掉物理内存
ipcInfo->pool.kvaddr = NULL; //从效果上看, 这句话可以不加,但加上看着更舒服, uvaddr 和 kvaddr 一对新人迎接美好未来
}
/* use vmalloc to alloc phy mem */
ipcInfo->pool.kvaddr = LOS_VMalloc(region->range.size);//从内核动态空间中申请线性区,分配同等量的物理内存,做好 内核 <-->物理内存的映射
if (ipcInfo->pool.kvaddr == NULL) {//申请物理内存失败, 肯定是玩不下去了.
ret = -ENOMEM; //返回没有内存了
goto ERROR_REGION_OUT;
}
/* do mmap */
ret = DoIpcMmap(pcb, region);//对uvaddr和kvaddr做好映射关系,如此用户态下通过操作uvaddr达到操作kvaddr的目的
if (ret) {
goto ERROR_MAP_OUT;
}
/* ipc pool init */
if (LOS_MemInit(ipcInfo->pool.kvaddr, region->range.size) != LOS_OK) {//初始化ipc池
ret = -EINVAL;
goto ERROR_MAP_OUT;
}
ipcInfo->pool.poolSize = region->range.size;//ipc池大小为线性区大小
return 0;
ERROR_MAP_OUT:
LOS_VFree(ipcInfo->pool.kvaddr);
ERROR_REGION_OUT:
if (ipcInfo != NULL) {
ipcInfo->pool.uvaddr = NULL;
ipcInfo->pool.kvaddr = NULL;
}
return ret;
} 解读
- 这个函数一定要看明白,重要部分已加注释,主要干了两件事。
- 通过LOS_VMalloc向内核堆空间申请了一段物理内存,参数是线性区的大小,并做好了映射,因是内核堆空间,所以分配的虚拟地址就是个内核地址,并将这个地址赋给了pool.kvaddr
ipcInfo->pool.kvaddr = LOS_VMalloc(region->range.size);//从内核动态空间中申请线性区,分配同等量的物理内存,做好 内核 <-->物理内存的映射
- 通过DoIpcMmap将参数pcb(用户进程)的IPC消息池的pool.uvaddr也映射到LOS_VMalloc分配的物理内存上,
ret = DoIpcMmap(pcb, region);//对uvaddr和kvaddr做好映射关系,如此用户态下通过操作uvaddr达到操作 详看DoIpcMmap的实现,因为太重要此处代码不做删改。LITE_OS_SEC_TEXT STATIC INT32 DoIpcMmap(LosProcessCB *pcb, LosVmMapRegion *region)
{
UINT32 i;
INT32 ret = 0;
PADDR_T pa;
UINT32 uflags = VM_MAP_REGION_FLAG_PERM_READ | VM_MAP_REGION_FLAG_PERM_USER;
LosVmPage *vmPage = NULL;
VADDR_T uva = (VADDR_T)(UINTPTR)pcb->ipcInfo->pool.uvaddr;//用户空间地址
VADDR_T kva = (VADDR_T)(UINTPTR)pcb->ipcInfo->pool.kvaddr;//内核空间地址
(VOID)LOS_MuxAcquire(&pcb->vmSpace->regionMux);
for (i = 0; i < (region->range.size >> PAGE_SHIFT); i++) {//获取线性区页数,一页一页映射
pa = LOS_PaddrQuery((VOID *)(UINTPTR)(kva + (i << PAGE_SHIFT)));//通过内核空间查找物理地址
if (pa == 0) {
PRINT_ERR("%s, %d\n", __FUNCTION__, __LINE__);
ret = -EINVAL;
break;
}
vmPage = LOS_VmPageGet(pa);//获取物理页框
if (vmPage == NULL) {//目的是检查物理页是否存在
PRINT_ERR("%s, %d\n", __FUNCTION__, __LINE__);
ret = -EINVAL;
break;
}
STATUS_T err = LOS_ArchMmuMap(&pcb->vmSpace->archMmu, uva + (i << PAGE_SHIFT), pa, 1, uflags);//将物理页映射到用户空间
if (err < 0) {
ret = err;
PRINT_ERR("%s, %d\n", __FUNCTION__, __LINE__);
break;
}
}
/* if any failure happened, rollback | 如果中间发生映射失败,则回滚*/
if (i != (region->range.size >> PAGE_SHIFT)) {
while (i--) {
pa = LOS_PaddrQuery((VOID *)(UINTPTR)(kva + (i << PAGE_SHIFT)));//查询物理地址
vmPage = LOS_VmPageGet(pa);//获取物理页框
(VOID)LOS_ArchMmuUnmap(&pcb->vmSpace->archMmu, uva + (i << PAGE_SHIFT), 1);//取消与用户空间的映射
LOS_PhysPageFree(vmPage);//释放物理页
}
}
(VOID)LOS_MuxRelease(&pcb->vmSpace->regionMux);
return ret;
}
- 至次LiteIPC的准备工作已经完成,接下来就是操作/控制阶段了
LiteIpcIoctl | 控制
LITE_OS_SEC_TEXT int LiteIpcIoctl(struct file *filep, int cmd, unsigned long arg)
{
UINT32 ret = LOS_OK;
LosProcessCB *pcb = OsCurrProcessGet();
ProcIpcInfo *ipcInfo = pcb->ipcInfo;
// 整个系统只能有一个ServiceManager,而Service可以有多个。ServiceManager有两个主要功能:一是负责Service的注册和注销,
// 二是负责管理Service的访问权限(只有有权限的任务(Task)可以向对应的Service发送IPC消息)。
switch (cmd) {
case IPC_SET_CMS:
return SetCms(arg); //设置ServiceManager , 整个系统只能有一个ServiceManager
case IPC_CMS_CMD: // 控制命令,创建/删除/添加权限
return HandleCmsCmd((CmsCmdContent *)(UINTPTR)arg);
case IPC_SET_IPC_THREAD:
return SetIpcTask();//将当前任务设置成当前进程的IPC任务ID
case IPC_SEND_RECV_MSG://发送和接受消息,代表消息内容
ret = LiteIpcMsgHandle((IpcContent *)(UINTPTR)arg);//处理IPC消息
break;
}
return ret;
} 解读
- LiteIPC中有两个主要概念,一个是ServiceManager,另一个是Service。整个系统只能有一个ServiceManager,而Service可以有多个。ServiceManager有两个主要功能:一是负责Service的注册和注销,二是负责管理Service的访问权限(只有有权限的任务(Task)可以向对应的Service发送IPC消息)。IPC_SET_CMS为设置ServiceManager命令,IPC_CMS_CMD为对Service的管理命令。
- IPC_SEND_RECV_MSG为消息的处理过程,消息的封装结合上篇理解,接收和发送消息对应的是LiteIpcRead和LiteIpcWrite两个函数。
- LiteIpcWrite 写消息指的是从用户空间向内核空间写数据,在消息内容体中已经指明这个消息时写给哪个任务的,如此达到了进程间(其实也是任务间)通讯的目的。
/// 写IPC消息队列,从用户空间到内核空间
LITE_OS_SEC_TEXT STATIC UINT32 LiteIpcWrite(IpcContent *content)
{
UINT32 ret, intSave;
UINT32 dstTid;
IpcMsg *msg = content->outMsg;
LosTaskCB *tcb = OS_TCB_FROM_TID(dstTid);//目标任务实体
LosProcessCB *pcb = OS_PCB_FROM_PID(tcb->processID);//目标进程实体
if (pcb->ipcInfo == NULL) {
PRINT_ERR("pid %u Liteipc not create\n", tcb->processID);
return -EINVAL;
}
//这里为什么要申请msg->dataSz,因为IpcMsg中的真正数据体 data是个指针,它的大小是dataSz . 同时申请存储偏移量空间
UINT32 bufSz = sizeof(IpcListNode) + msg->dataSz + msg->spObjNum * sizeof(UINT32);//这句话是理解上层消息在内核空间数据存放的关键!!! @note_good
IpcListNode *buf = (IpcListNode *)LiteIpcNodeAlloc(tcb->processID, bufSz);//向内核空间申请bufSz大小内存
if (buf == NULL) {
PRINT_ERR("%s, %d\n", __FUNCTION__, __LINE__);
return -ENOMEM;
}//IpcListNode的第一个成员变量就是IpcMsg
ret = CopyDataFromUser(buf, bufSz, (const IpcMsg *)msg);//将消息内容拷贝到内核空间,包括消息控制体+内容体+偏移量
if (ret != LOS_OK) {
PRINT_ERR("%s, %d\n", __FUNCTION__, __LINE__);
goto ERROR_COPY;
}
if (tcb->ipcTaskInfo == NULL) {//如果任务还没有IPC信息
tcb->ipcTaskInfo = LiteIpcTaskInit();//初始化这个任务的IPC信息模块,因为消息来了要处理了
}
ret = HandleSpecialObjects(dstTid, buf, FALSE);//处理消息
if (ret != LOS_OK) {
PRINT_ERR("%s, %d\n", __FUNCTION__, __LINE__);
goto ERROR_COPY;
}
/* add data to list and wake up dest task *///向列表添加数据并唤醒目标任务
SCHEDULER_LOCK(intSave);
LOS_ListTailInsert(&(tcb->ipcTaskInfo->msgListHead), &(buf->listNode));//把消息控制体挂到目标任务的IPC链表头上
OsHookCall(LOS_HOOK_TYPE_IPC_WRITE, &buf->msg, dstTid, tcb->processID, tcb->waitFlag);
if (tcb->waitFlag == OS_TASK_WAIT_LITEIPC) {//如果这个任务在等这个消息,注意这个tcb可不是当前任务
OsTaskWakeClearPendMask(tcb);//撕掉对应标签
OsSchedTaskWake(tcb);//唤醒任务执行,因为任务在等待读取 IPC消息
SCHEDULER_UNLOCK(intSave);
LOS_MpSchedule(OS_MP_CPU_ALL);//设置调度方式,所有CPU核发生一次调度,这里非要所有CPU都调度吗?
//可不可以查询下该任务挂在哪个CPU上,只调度对应CPU呢? 注者在此抛出思考 @note_thinking
LOS_Schedule();//发起调度
} else {
SCHEDULER_UNLOCK(intSave);
}
return LOS_OK;
ERROR_COPY:
LiteIpcNodeFree(OS_TCB_FROM_TID(dstTid)->processID, buf);//拷贝发生错误就要释放内核堆内存,那可是好大一块堆内存啊
return ret;
}
- 大概流程就是从LiteIpc内存池中分配内核空间装用户空间的数据,注意一定要从LiteIpcNodeAlloc分配,原因代码中也已注明。
- 有数据了就将数据挂到目标任务的IPC双向链表上,如果任务在等待读取消息(OS_TASK_WAIT_LITEIPC)则唤醒目标任务执行,并发起调度LOS_Schedule。
- LiteIpcRead和LiteIpcWrite是遥相呼应,读消息指将内核空间数据读到用户空间处理。
/// 读取IPC消息
LITE_OS_SEC_TEXT STATIC UINT32 LiteIpcRead(IpcContent *content)
{
UINT32 intSave, ret;
UINT32 selfTid = LOS_CurTaskIDGet();//当前任务ID
LOS_DL_LIST *listHead = NULL;
LOS_DL_LIST *listNode = NULL;
IpcListNode *node = NULL;
UINT32 syncFlag = (content->flag & SEND) && (content->flag & RECV);//同步标签
UINT32 timeout = syncFlag ? LOS_MS2Tick(LITEIPC_TIMEOUT_MS) : LOS_WAIT_FOREVER;
LosTaskCB *tcb = OS_TCB_FROM_TID(selfTid);//获取当前任务实体
if (tcb->ipcTaskInfo == NULL) {//如果任务还没有赋予IPC功能
tcb->ipcTaskInfo = LiteIpcTaskInit();//初始化任务的IPC
}
listHead = &(tcb->ipcTaskInfo->msgListHead);//获取IPC信息头节点
do {//注意这里是个死循环
SCHEDULER_LOCK(intSave);
if (LOS_ListEmpty(listHead)) {//链表为空 ?
OsTaskWaitSetPendMask(OS_TASK_WAIT_LITEIPC, OS_INVALID_VALUE, timeout);//设置当前任务要等待的信息
OsHookCall(LOS_HOOK_TYPE_IPC_TRY_READ, syncFlag ? MT_REPLY : MT_REQUEST, tcb->waitFlag);//向hook模块输入等待日志信息
ret = OsSchedTaskWait(&g_ipcPendlist, timeout, TRUE);//将任务挂到全局链表上,任务进入等IPC信息,等待时间(timeout),此处产生调度,将切换到别的任务执行
//如果一个消息在超时前到达,则任务会被唤醒执行,返回就不是LOS_ERRNO_TSK_TIMEOUT
if (ret == LOS_ERRNO_TSK_TIMEOUT) {//如果发生指定的时间还没有IPC到达时
OsHookCall(LOS_HOOK_TYPE_IPC_READ_TIMEOUT, syncFlag ? MT_REPLY : MT_REQUEST, tcb->waitFlag);//打印任务等待IPC时发生 回复/请求超时
SCHEDULER_UNLOCK(intSave);
return -ETIME;
}
if (OsTaskIsKilled(tcb)) {//如果发生任务被干掉了的异常
OsHookCall(LOS_HOOK_TYPE_IPC_KILL, syncFlag ? MT_REPLY : MT_REQUEST, tcb->waitFlag);//打印任务在等待IPC期间被干掉了的
SCHEDULER_UNLOCK(intSave);
return -ERFKILL;
}
SCHEDULER_UNLOCK(intSave);
} else {//有IPC节点数据时
listNode = LOS_DL_LIST_FIRST(listHead);//拿到首个节点
LOS_ListDelete(listNode);//从链表上摘掉节点,读后即焚
node = LOS_DL_LIST_ENTRY(listNode, IpcListNode, listNode);//获取节点实体
SCHEDULER_UNLOCK(intSave);
ret = CheckRecievedMsg(node, content, tcb);//检查收到的信息
if (ret == LOS_OK) {//信息没问题
break;
}
if (ret == -ENOENT) { /* It means that we've recieved a failed reply | 收到异常回复*/
return ret;
}
}
} while (1);
node->msg.data = (VOID *)GetIpcUserAddr(LOS_GetCurrProcessID(), (INTPTR)(node->msg.data));//转成用户空间地址
node->msg.offsets = (VOID *)GetIpcUserAddr(LOS_GetCurrProcessID(), (INTPTR)(node->msg.offsets));//转成用户空间的偏移量
content->inMsg = (VOID *)GetIpcUserAddr(LOS_GetCurrProcessID(), (INTPTR)(&(node->msg)));//转成用户空间数据结构
EnableIpcNodeFreeByUser(LOS_GetCurrProcessID(), (VOID *)node);//创建一个空闲节点,并挂到进程IPC已使用节点链表上
return LOS_OK;
}
- 调度到目标任务后,将切到LiteIpcRead执行,此时读函数正在经历一个do .. while(1) 死循环等待消息到来。LiteIpcRead的最后是内核地址和用户地址的转换,这也是LiteIpc最精彩的部分,它们指向同一块数据。
- 当LiteIpcRead读取不到消息时,即当前任务的消息链表为空时,任务会设置一个等待标签OS_TASK_WAIT_LITEIPC,并将自己挂起,由OsSchedTaskWait让出CPU给其他任务继续执行,请反复理解读写函数。
|