我的新浪微博:。
欢迎大家相互交流,共同提高技术。
第三节、rpc通信过程分析
前面两个小节分别对rpc服务端和客户端的建立流程做了详细的分析,也就是说rpc客户端和服务器端已经能够进行正常的通信了(rpc客户端已经通过connect链接上rpc服务器了),那么这一小节主要根据一个实际的例子来分析一个完整的rpc通信过程。
下面以客户端创建逻辑卷(volume)为例来分析rpc的通信过程,就以下面这个客户端的命令开始:
gluster volume create test-volume server3:/exp3 server4:/exp4
先简单看看glusterfs的客户端是怎样开始提交rpc请求的,提交准备过程流程图如下:
从上面的流程图可以看出真正开始提交rpc请求调用还是从具体命令的回调函数开始发起的,上面的流程图主要展示的是准备工作,下面从具体命令的回调函数开始分析,这里分析的实例是创建逻辑卷的命令,执行的函数是cli_cmd_volume_create_cbk,主要实现代码如下:
1 proc = &cli_rpc_prog->proctable[GLUSTER_CLI_CREATE_VOLUME];//从rpc程序表中选择对应函数 2 3 frame = create_frame (THIS, THIS->ctx->pool);//创建帧 4 5 ret = cli_cmd_volume_create_parse (words, wordcount, &options);//创建逻辑卷的命令解析 6 7 if (proc->fn) { 8 9 ret = proc->fn (frame, THIS, options);//执行命令的回调函数10 11 }12 13 if (ret) {14 15 cli_cmd_sent_status_get (&sent);//得到命令发送状态16 17 if ((sent == 0) && (parse_error == 0))18 19 cli_out ("Volume create failed");//如果失败,错误提示20 21 }
首先选择对应命令的rpc客户端创建逻辑卷的命令函数,然后解析命令以后执行相应的创建逻辑卷的rpc函数,下面是对应的函数存放表项:
1 [GLUSTER_CLI_CREATE_VOLUME] = { "CREATE_VOLUME", gf_cli3_1_create_volume}
所以真正的提交函数是gf_cli3_1_create_volume函数,继续分析这个函数,主要实现代码如下:
1 ret = cli_cmd_submit (&req, frame, cli_rpc_prog, GLUSTER_CLI_CREATE_VOLUME, NULL,2 3 gf_xdr_from_cli_create_vol_req, this, gf_cli3_1_create_volume_cbk);
主要代码也只有一行,其余代码就是为了这一行的函数调用做相应参数准备的,这一行的这个函数就是所有客户端命令提交rpc请求服务的实现函数,只是提交的数据不同而已!下面重点分析这个函数,还是先看看主要代码:
1 cli_cmd_lock ();//命令对象加锁 2 3 cmd_sent = 0;//初始化命令发送状态标示为0 4 5 ret = cli_submit_request (req, frame, prog, procnum, NULL, sfunc, this, cbkfn);//提交请求 6 7 if (!ret) { 8 9 cmd_sent = 1;//标示已经发送10 11 ret = cli_cmd_await_response ();//等待响应12 13 } else14 15 cli_cmd_unlock ();//不成功解锁
在发送具体的rpc请求以前先锁住命令对象,然后调用函数cli_submit_request 把rpc请求发送出去(应该是异步的),然后设置命令以发送标志,并调用函数cli_cmd_await_response等待响应。继续看提交rpc请求的函数:
1 iobuf = iobuf_get (this->ctx->iobuf_pool);//从缓冲池取一个io缓存 2 3 if (!iobref) { 4 5 iobref = iobref_new ();//新建一个iobuf引用池 6 7 new_iobref = 1;//标志 8 9 }10 11 iobref_add (iobref, iobuf);//把io缓存加入io缓存引用池12 13 iov.iov_base = iobuf->ptr;//io向量基地址(供用户使用的内存)14 15 iov.iov_len = 128 * GF_UNIT_KB;//大小16 17 if (req && sfunc) {18 19 ret = sfunc (iov, req);//序列化为xdr格式数据(表示层数据格式)20 21 iov.iov_len = ret;//序列化以后的长度22 23 count = 1;//计数初始化为124 25 }26 27 ret = rpc_clnt_submit (global_rpc, prog, procnum, cbkfn, &iov, count,//提交客户端rpc请求28 29 NULL, 0, iobref, frame, NULL, 0, NULL, 0, NULL);
Xdr数据格式的转换是调用函数库实现的,不具体分析,需要明白的是经过sfunc 函数调用以后就是xdr格式的数据了,最后根据转化后的数据调用rpc_clnt_submit提交客户端的rpc请求。继续深入函数:
1 rpcreq = mem_get (rpc->reqpool);//重rpc对象的请求对象池得到一个请求对象 2 3 if (!iobref) { 4 5 iobref = iobref_new ();//如果io缓存引用池为null就新建一个 6 7 new_iobref = 1;//新建标志 8 9 }10 11 callid = rpc_clnt_new_callid (rpc);//新建一个rpc调用的id号12 13 conn = &rpc->conn;//从rpc对象中取得链接对象14 15 rpcreq->prog = prog;//赋值rpc请求对象的程序16 17 rpcreq->procnum = procnum;//程序号18 19 rpcreq->conn = conn;//链接对象20 21 rpcreq->xid = callid;//调用id号22 23 rpcreq->cbkfn = cbkfn;//回调函数24 25 if (proghdr) { //程序头不为空26 27 proglen += iov_length (proghdr, proghdrcount);//计算头部长度加入程序消息总长度28 29 }30 31 if (progpayload) {32 33 proglen += iov_length (progpayload, progpayloadcount);//计算io向量的长度加入总长度34 35 }36 37 request_iob = rpc_clnt_record (rpc, frame, prog, procnum, proglen, &rpchdr, callid);//建立rpc记录38 39 iobref_add (iobref, request_iob);//添加rpc记录的io缓存区到io缓存引用池40 41 req.msg.rpchdr = &rpchdr;//rpc请求消息头部42 43 req.msg.rpchdrcount = 1;//头部数量44 45 req.msg.proghdr = proghdr;//程序头部46 47 req.msg.proghdrcount = proghdrcount;//程序头部数量48 49 req.msg.progpayload = progpayload;//xdr格式数据50 51 req.msg.progpayloadcount = progpayloadcount;//数量52 53 req.msg.iobref = iobref;//io缓存引用池54 55 req.rsp.rsphdr = rsphdr;//响应头部56 57 req.rsp.rsphdr_count = rsphdr_count;//数量58 59 req.rsp.rsp_payload = rsp_payload;//负载60 61 req.rsp.rsp_payload_count = rsp_payload_count;//数量62 63 req.rsp.rsp_iobref = rsp_iobref;//响应缓存引用池64 65 req.rpc_req = rpcreq;//rpc请求66 67 pthread_mutex_lock (&conn->lock);//加锁68 69 {70 71 if (conn->connected == 0) { //还没有建立连接72 73 rpc_transport_connect (conn->trans, conn->config.remote_port);//建立连接74 75 }76 77 ret = rpc_transport_submit_request (rpc->conn.trans, &req);//提交传输层rpc请求78 79 if ((ret >= 0) && frame) {80 81 gettimeofday (&conn->last_sent, NULL);//设置最后发送时间82 83 __save_frame (rpc, frame, rpcreq);//保存帧到队列84 85 }86 87 }88 89 pthread_mutex_unlock (&conn->lock);//解锁
经过上面的代码,现在数据已经到达传输层,所以现在就开始调用传输层的rpc请求发送函数rpc_transport_submit_request,代码如下:
1 ret = this->ops->submit_request (this, req);
这里采用函数指针的方式进行调用的,具体的传输协议调用具体的传输函数,这些函数都是在装载协议库的时候已经赋值具体函数的实现了,分析的是tcp,所以看看tcp的发送函数:
1 struct rpc_transport_ops tops = {2 3 ......4 5 .submit_request = socket_submit_request,6 7 ......8 9 };
从上面可以看出tcp的发送函数是socket_submit_request,主要实现代码如下:
1 pthread_mutex_lock (&priv->lock);//加锁 2 3 { 4 5 priv->submit_log = 0;//提交标志初始化为0 6 7 entry = __socket_ioq_new (this, &req->msg);//根据请求对象的消息新建一个io请求队列 8 9 if (list_empty (&priv->ioq)) { //判断提交io请求队列是否为空10 11 ret = __socket_ioq_churn_entry (this, entry);//开始依次提交传输层的io请求12 13 if (ret == 0)14 15 need_append = 0;//需要添加到entry链表16 17 if (ret > 0)18 19 need_poll_out = 1;//需要注册可写事件20 21 }22 23 if (need_append) {24 25 list_add_tail (&entry->list, &priv->ioq);//添加到entry的链表26 27 }28 29 if (need_poll_out) { //注册可写事件30 31 priv->idx = event_select_on (ctx->event_pool, priv->sock, priv->idx, -1, 1);32 33 }34 35 }36 37 pthread_mutex_unlock (&priv->lock);//解锁
这段加锁的代码就是完成整个rpc请求信息的发送,如果没有发送完毕就在注册一个可写事件启动下一次请求,到此客户端的rpc请求已经发送完毕,就开始等待服务器的响应。
下面就看看rpc服务器端怎么响应客户端的请求,并根据相应的请求命令做怎样的处理。在分析rpc服务启动的时候知道注册了监听事件,监听事件的处理函数是socket_server_event_handler,它的主要实现代码如下:
1 pthread_mutex_lock (&priv->lock); 2 3 { 4 5 if (poll_in) { //连接到来是可读事件 6 7 new_sock = accept (priv->sock, SA (&new_sockaddr), &addrlen);//接收客户端连接 8 9 if (!priv->bio) { //设置非阻塞10 11 ret = __socket_nonblock (new_sock);12 13 }14 15 if (priv->nodelay) { //设置无延迟发送16 17 ret = __socket_nodelay (new_sock);18 19 }20 21 if (priv->keepalive) { //设置保持连接22 23 ret = __socket_keepalive (new_sock, priv->keepaliveintvl, priv->keepaliveidle);24 25 }26 27 //为连接对象28 29 new_trans = GF_CALLOC (1, sizeof (*new_trans), gf_common_mt_rpc_trans_t);30 31 new_trans->name = gf_strdup (this->name);//赋值名称32 33 memcpy (&new_trans->peerinfo.sockaddr, &new_sockaddr, addrlen);//赋值地址信息34 35 new_trans->peerinfo.sockaddr_len = addrlen;//长度36 37 new_trans->myinfo.sockaddr_len = sizeof (new_trans->myinfo.sockaddr);38 39 ret = getsockname (new_sock, SA (&new_trans->myinfo.sockaddr),40 41 &new_trans->myinfo.sockaddr_len);//得到新socket的地址信息42 43 get_transport_identifiers (new_trans);44 45 socket_init (new_trans);//初始化新的传输层对象(新的socket)46 47 pthread_mutex_lock (&new_priv->lock);48 49 {50 51 new_priv->connected = 1;//连接已经建立52 53 rpc_transport_ref (new_trans);//传输对象引用计数加154 55 new_priv->idx = event_register (ctx->event_pool, new_sock,//注册可读事件56 57 socket_event_handler, new_trans, 1, 0);58 59 }60 61 pthread_mutex_unlock (&new_priv->lock);62 63 //执行传输对象注册的通知函数,通知已经接受客户端连接请求64 65 ret = rpc_transport_notify (this, RPC_TRANSPORT_ACCEPT, new_trans);66 67 }68 69 }70 71 pthread_mutex_unlock (&priv->lock);
上面的代码主要就是处理客户端的连接请求,然后在新的socket上注册可读事件(准备读取客户端发送来的rpc请求信息),并且执行通知函数做相应的处理。注册的可读事件的处理函数是socket_event_handler,主要是实现代码如下:
1 if (!priv->connected) { //如果连接还没有完成就继续完成连接,因为连接是异步的可能没有立即完成 2 3 ret = socket_connect_finish (this); 4 5 } 6 7 if (!ret && poll_out) { //处理可写事件 8 9 ret = socket_event_poll_out (this);10 11 }12 13 if (!ret && poll_in) { //处理可读事件14 15 ret = socket_event_poll_in (this);16 17 }
客户端的连接请求对于服务器来说是可读事件,所以执行的socket_event_poll_in函数,当服务器需要发送响应信息到rpc客户端的时候就会执行可写事件处理函数。继续分析接收客户端请求信息的处理函数socket_event_poll_in主要代码如下:
1 ret = socket_proto_state_machine (this, &pollin);//根据rpc服务记录的状态做相应处理2 3 if (pollin != NULL) {4 5 ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_RECEIVED, pollin);//执行通知函数6 7 rpc_transport_pollin_destroy (pollin);//完成处理就销毁资源8 9 }
上面的代码主要还是调用其它函数继续处理rpc客户端的请求信息,然后执行通知函数通知传输对象消息已经被接收,最后销毁传输层相关不在需要的资源。处理具体请求信息的实现是在函数socket_proto_state_machine,而这个函数又调用__socket_proto_state_machine来处理,所以看看这个函数实现功能的主要代码:
1 while (priv->incoming.record_state != SP_STATE_COMPLETE) { //直到rpc服务记录状态完成为止 2 3 switch (priv->incoming.record_state) { //根据现在rpc服务记录的状态做相应处理 4 5 case SP_STATE_NADA://开始状态 6 7 iobuf = iobuf_get (this->ctx->iobuf_pool);//取得一个io缓存 8 9 priv->incoming.record_state = SP_STATE_READING_FRAGHDR;//改变状态为读取头部 10 11 case SP_STATE_READING_FRAGHDR://读取头部信息 12 13 ret = __socket_readv (this, priv->incoming.pending_vector, 1,//读取信息 14 15 &priv->incoming.pending_vector, 16 17 &priv->incoming.pending_count, NULL); 18 19 if (ret > 0) { //读取了部分头部信息 20 21 } 22 23 if (ret == 0) { //读取了所有头部信息,继续下一步的处理 24 25 priv->incoming.record_state = SP_STATE_READ_FRAGHDR;//改变为下一步 26 27 } 28 29 case SP_STATE_READ_FRAGHDR://处理已经读取的头部信息 30 31 priv->incoming.fraghdr = ntoh32 (priv->incoming.fraghdr);//转换头部信息为主机字节 32 33 priv->incoming.record_state = SP_STATE_READING_FRAG;//转化为读取帧数据状态 34 35 priv->incoming.total_bytes_read += RPC_FRAGSIZE(priv->incoming.fraghdr);//字节数 36 37 case SP_STATE_READING_FRAG://读取所有的数据 38 39 ret = __socket_read_frag (this);//读取所有帧数据 40 41 priv->incoming.frag.bytes_read = 0; 42 43 if (!RPC_LASTFRAG (priv->incoming.fraghdr)) { //是否为最后一帧数据 44 45 priv->incoming.record_state = SP_STATE_READING_FRAGHDR;//不是 46 47 break;//退出循环,从新读取头部信息 48 49 } 50 51 if (pollin != NULL) { 52 53 int count = 0;//计数 54 55 priv->incoming.iobuf_size = priv->incoming.total_bytes_read 56 57 - priv->incoming.payload_vector.iov_len;//计算io缓存大小 58 59 memset (vector, 0, sizeof (vector));//io向量清零 60 61 if (priv->incoming.iobref == NULL) { //io缓存引用池为null就新建一个 62 63 priv->incoming.iobref = iobref_new (); 64 65 } 66 67 vector[count].iov_base = iobuf_ptr (priv->incoming.iobuf);//取io缓存基地址 68 69 vector[count].iov_len = priv->incoming.iobuf_size;//io缓存长度 70 71 iobref = priv->incoming.iobref;//io缓存引用池 72 73 count++;//计数加1 74 75 if (priv->incoming.payload_vector.iov_base != NULL) { //负载向量不为null 76 77 vector[count] = priv->incoming.payload_vector;//保存负载io向量 78 79 count++;//计数加1 80 81 } 82 83 //新建一个传输层可取对象 84 85 *pollin = rpc_transport_pollin_alloc (this, vector, count, priv->incoming.iobuf, 86 87 iobref, priv->incoming.request_info); 88 89 iobuf_unref (priv->incoming.iobuf);//io缓存引用计算减1 90 91 priv->incoming.iobuf = NULL;//清零 92 93 if (priv->incoming.msg_type == REPLY)//消息类型是回复 94 95 (*pollin)->is_reply = 1;//设置回复标志 96 97 priv->incoming.request_info = NULL;//请求信息清零 98 99 }100 101 priv->incoming.record_state = SP_STATE_COMPLETE;//设置为完成状态102 103 break;104 105 }106 107 }108 109 if (priv->incoming.record_state == SP_STATE_COMPLETE) { //如果rpc请求记录为完成状态110 111 priv->incoming.record_state = SP_STATE_NADA;//重新初始化为开始状态112 113 __socket_reset_priv (priv);//复位私有数据对象114 115 }
整个处理过程分为了几个阶段,而且每一个阶段只处理相应的事情,然后就进入下一个阶段,因为前几个阶段case语言都是不带break的,所以直接进入下一个阶段,最终达到完成状态就退出循环,一个完成的处理过程其实就只需要一次循环就解决了。当所有rpc请求消息都已经接收以后就调用通知函数(在传输对象上注册的通知函数)通知传输对象消息已经接收,由rpc服务器的初始化过程我们知道注册的传输对象通知函数是rpcsvc_notify ,这个函数主要实现代码如下:
1 switch (event) { 2 3 case RPC_TRANSPORT_ACCEPT://rpc请求已经被接收处理 4 5 new_trans = data; 6 7 ret = rpcsvc_accept (svc, trans, new_trans);//处理函数 8 9 break;10 11 case RPC_TRANSPORT_DISCONNECT://断开连接消息12 13 ret = rpcsvc_handle_disconnect (svc, trans);//处理函数14 15 break;16 17 case RPC_TRANSPORT_MSG_RECEIVED://消息已经接收18 19 msg = data;20 21 ret = rpcsvc_handle_rpc_call (svc, trans, msg);//rpc调用处理函数22 23 break;24 25 case RPC_TRANSPORT_MSG_SENT://消息已经发生,不需要处理26 27 break;28 29 case RPC_TRANSPORT_CONNECT://已经连接30 31 break;32 33 case RPC_TRANSPORT_CLEANUP://清零消息34 35 listener = rpcsvc_get_listener (svc, -1, trans->listener);//得到对应的监听器对象36 37 rpcsvc_program_notify (listener, RPCSVC_EVENT_TRANSPORT_DESTROY, trans);//通知上层38 39 break;40 41 case RPC_TRANSPORT_MAP_XID_REQUEST:42 43 break;44 45 }
传输对象注册的通知函数会根据传递过来的信息类型做相应的处理,这里传递过来的消息是消息已经接收,它的处理就是开始执行rpc调用了,执行的函数是rpcsvc_handle_rpc_call,它的主要实现代码如下:
1 req = rpcsvc_request_create (svc, trans, msg);//创建一个rpc服务请求对象 2 3 if (!rpcsvc_request_accepted (req))//判断rpc请求是否被接受 4 5 ; 6 7 actor = rpcsvc_program_actor (req);//根据请求对象取得rpc过程调用对象 8 9 if (actor && (req->rpc_err == SUCCESS)) { //rpc过程调用对象不为null并且请求信息是成功的10 11 THIS = svc->mydata;//取得xlator对象12 13 if (req->count == 2) { //请求的数量等于214 15 if (actor->vector_actor) { //向量过程不为null,就执行向量处理函数16 17 ret = actor->vector_actor (req, &req->msg[1], 1, req->iobref);18 19 } else {20 21 rpcsvc_request_seterr (req, PROC_UNAVAIL);//出错,不可用的函数22 23 ret = RPCSVC_ACTOR_ERROR;//调用过程出错24 25 }26 27 } else if (actor->actor) {28 29 ret = actor->actor (req);//调用rpc请求函数30 31 }32 33 }34 35 if (ret == RPCSVC_ACTOR_ERROR) { //出错36 37 ret = rpcsvc_error_reply (req);//回复客户端rpc请求处理出错38 39 }
上面代码首先根据接收到的信息建立一个请求对象,然后根据建立的请求对象判断是都已经成功接纳此次rpc请求调用,如果是就继续执行函数rpcsvc_program_actor,这个函数会根据程序号、函数号等信息查找对应的rpc请求的远程过程调用,如果找到就执行相应的函数调用。我们分析的是客户端发送一条创建逻辑卷的命令道服务器端,根据服务器端在启动初始化的过程中注册的程序集中我们能够找到如下一条对应的函数信息:
1 [GLUSTER_CLI_CREATE_VOLUME] = { "CLI_CREATE_VOLUME", GLUSTER_CLI_CREATE_VOLUME, glusterd_handle_create_volume, NULL,NULL},
所以服务器端就会调用函数glusterd_handle_create_volume,如果在处理rpc请求的过程中遇到错误就会向客户端发送一个错误信息的相应消息。当然如果调用成功的话也同样会返回给客户端一个相应的结果信息。客户端会接收服务器端的回复,然后根据消息内容做相应的处理,如:创建成功等提示信息。这样一次完整的rpc通信就完成了。