`
helpbs
  • 浏览: 1161717 次
文章分类
社区版块
存档分类
最新评论

Linux Socket 学习(十一)

 
阅读更多
并发客户端服务器

到现在为止我们所介绍的这些服务器 程序,在接受下一个客户端连接之前只处理一个客户端请求。这对于即时回复的服务器来说是高效且简单的。然而,如果处理要花费较长时间,或者有一段不活动时 期,这样的设计就会无限制的阻止其他客户端的连接。因为服务器通常在最小的延迟时间内服务尽可能多的客户端,在服务器的连接端需要一个基础设计的改变。

在这一章,我们将会学到下面的一些内容:
为处理多个客户端连接使用fork(2)函数
wait(2)与waitpid(2)函数
处理多个客户端连接的select(2)函数

掌握了这些内容可以使得我们编写一次处理大量客户端的专业级服务器。

理解多客户端的问题

图11.1显示了多个客户端连接到同一个服务器的情况。

图11.1中心的服务器必须在多个连接的客户端之前平衡资源。服务器通常设计来使得每一个客户端认为自己独占服务器访问。然而,事实上,服务器以一种并发的方式来服务所有的客户端。

可以用下面的方法来实现这样的目的:
派生(fork)服务器进程(多进程方法)
线程化服务器进程(多线程方法)
一个进程与一个select调用
一个进程与一个poll(2)调用

使用fork系统调用的第一种方法也许是服务多客户端进程的最简单的方法。然而,他的缺点是信息的共享变得更为复杂。这通常需要使用消息队列,共享内存以及信号量。他的另一个缺点就是需要CPU为每一个请求启动并管理一个新的进程。

线程化服务器的方法对于UNIX而言是较新的方法,对于Linux也是一个新的选项。线程提供了多进程方法的轻量优点,而不会阻碍中心的通信。然而,线程处理难于高度,尤其对于编程新手而言尤其如此。因为这个原因,在这里我们并不会讨论线程的相关内容。

最后两种方法需要调用select或poll函数调用。每一个函数都提供了一个不同的方法来阻塞服务的运行,直到有事情发生。我们在这一章将会详细讨论select函数。感兴趣的读者可以阅读poll手册页来了解poll的相关内容。

使用fork(2)来服务多个客户端

在这里我们会使用fork函数修改在第10章所开发的服务器程序来处理多个客户端。下面的代码为修改过的rpnsrv.c模块。其他的代码与前一章的代码相同。
/*rpnsrv.c
*
* Example RPN Server:
*/
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <sys/wait.h>
#include <signal.h>

#ifndef SHUT_RDWR
#define SHUT_RDWR 3
#endif

extern int mkaddr(void *addr,
int *addr_len,
char *input_address,
char *protocol);

extern void rpn_process(FILE *tx,
char *buf);

/*
* Process Terminated Child processes:
*/
static void sigchld_handler(int signo)
{
pid_t PID;
int status;

do
{
PID = waitpid(-1,&status,WNOHANG);
}while(PID != -1);

/*
* Re-instate handler
*/
signal(SIGCHLD,sigchld_hander);
}

/*
* This function report the error and
* exits back to the shell:
*/
static void bail(const char *on_what)
{
if(errno != 0)
{
fputs(strerror(errno),stderr);
fputs(": ",stderr);
}
fputs(on_what,stderr);
fputc('/n',stderr);
exit(1);
}

int main(int argc,char **argv)
{
int z;
char *srvr_addr = "127.0.0.1:9090";
struct sockaddr_in adr_srvr; /* AF_INET */
struct sockaddr_in adr_clnt; /* AF_INET */
int len_inet; /* length */
int s = -1; /* Socket */
int c = -1; /* Client Socket */
FILE *rx = NULL; /* Read Stream */
FILE *tx = NULL; /* Write Stream */
char buf[4096]; /* I/O buffer */
pid_t PID; /* Process ID */

/*
* set signal hander for SIGCHLD:
*/
signal(SIGCHLD,sigchld_hander);

/*
* User a server address from the command
* line,otherwise default to 127.0.0.1:
*/
if(argc>=2)
{
srvr_addr = argv[1];
}

len_inet = sizeof adr_srvr;
z = mkaddr(&adr_srvr,&len_inet,
srvr_addr,"tcp");

if(z<0 || !adr_srvr.sin_port)
{
fprintf(stderr,"Invalid server "
"address, or no port number "
"was specified./n");
exit(1);
}

/*
* Create a TCP/IP socket to use:
*/
s = socket(PF_INET,SOCK_STREAM,0);
if(s==-1)
bail("socket(2)");

/*
* Bind the server address:
*/
z = bind(s,(struct sockaddr *)&adr_srvr,len_inet);
if(z==-1)
bail("bind(2)");

/*
* make it a listening socket:
*/
z = listen(s,10);
if(z==-1)
bail("listen(2)");

/*
* Start the server loop:
*/
for(;;)
{
/*
* wait for a connect:
*/
len_inet = sizeof adr_clnt;
c = accept(s,(struct sockaddr *)&adr_clnt,&len_inet);
if(c==-1)
bail("accept(2)");

/*
* For a new server process
* to service this client:
*/
if((PID = fork()) == -1)
{
/* Failed to fork:Give up */
close(c);
continue;
}
else if(PID > 0)
{
/* parent process: */
close(c);
continue;
}

/*
* CHILD process
* Create streams:
*/
rx = fdopen(c,"r");
if(!rx)
{
/* failed */
close(c);
continue;
}

tx = fdopen(dup(c),"w");
if(!tx)
{
fclose(rx);
continue;
}

/*
* Set both streams to line buffer mode:
*/
setlinebuf(rx);
setlinebuf(tx);

/*
* Process client's requests:
*/
while(fgets(buf,sizeof buf,rx))
rpn_process(tx,buf);

/*
* close this client's connection:
*/
fclose(tx);
shutdown(fileno(rx),SHUT_RDWR);
fclose(rx);

/*
* Child process must exit:
*/
exit(0);
}

return 0;
}

使用select(2)设计服务器

我 们在前面所提供的服务器程序使用fork程序来处理多个客户端请求,然而还有其他也许是更好的方法。在多个客户端之间共享信息的服务器也许会发现他需要将 服务器包含在一个单一进程中。需要单一进程的另一个需求就在于一个进程并不会消费多个进程所需要的系统资源。正是因为这些原因,需要考虑一个新的服务器设 计。

简介select(2)函数

select函数允许我们阻塞我们服务器的执行,直到有需要服务器来做的事情。更为特别的是,他允许调用知道下列内容:
何时需要从一个文件描述符进行读取
何时写入文件描述符而不阻塞服务器程序的执行
何时在文件描述符上发生例外

我们也许会记起套接口句柄为一个文件描述符。当在指定的连接套接口集合中的任何一个上有事件发生时,select函数会通知服务器。事实上,这会允许服务器以一种高效的方式来处理多个客户端。

正如我们在前面所指出的,当任何新的请求数据由一个客户端套接口到来时,服务器会得到通知。正是这个原因,服务器需要知道何时由一个指定的客户端套接口读取数据。

当 向客户端发送回数据时,对于服务器而言,他知道向套接口写入数据而不阻塞是十分重要的。例如,如果一个连接的客户端请求返回大量的信息,服务器将会向这个 套接口写入这些信息。如果客户端软件有缺陷,或是慢速读取数据,服务器就会阻塞一段时间,并试着写入剩余的结果数据。这就会使得连接到这个服务器上的其他 客户端也必须等待。这正是我们所不希望的,因为每一个客户端必须尽可能快速的进行响应。

如果我们的服务器必须同时处理超边数据,那么我们也许会对发生在套接口上的例外感兴趣。

select函数概要如下:
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
int select(int n,
fd_set *readfds,
fd_set *writefds,
fd_set *exceptfds,
struct timeval *timeout);

这个函数需要五个输入参数:
1 要测试的文件描述符的最大个数(n)。这个值至少为最大的文件描述符值加1,因为文件描述符由0开始。
2 为读取数据测试的文件描述符集合(readfds)。
3 为写入数据测试的文件描述符集合(writefds)。
4 为例外测试的文件描述符集合(exceptfds)。
5 指向应用在这个函数调用上的超时条件指针(timeout)。这个指针可以为NULL,表明没有超时(这个函数也许会永远阻塞)。

select函数的返回结果总结如下:
-1 表明函数调用发生错误。错误号存放在errno变量中。
0表明发生了超时而没有发生其他的错误。
大于0表明事件发生的文件描述符的数目。

timeval结构

最后一个参数timeout指向一个必须进行初始化的结构,除非指定了一个NULL指针。timeval结构定义如下:
struct timeval {
long tv_sec; /* seconds */
long tv_usec; /* microseconds */
};
要建立一个1.75秒的超时值,我们可以用下面的代码来完成:
struct timeval tv;
tv.tv_sec = 1;
tv.tv_usec = 750000;

处理文件描述符集合

select函数中的第二个,第三个以及第四个函数需要fd_set类型的值,这对于我们也许是新的内容。这是一个不透明的数据类型,因为他需要使用提供的宏进行操作。我们可用的宏的概要如下:
FD_ZERO(fd_set *set);
FD_SET(int fd, fd_set *set);
FD_CLR(int fd, fd_set *set);
FD_ISSET(int fd, fd_set *set);

这些C宏允许我们操作文件描述符集合。下面的部分将会详细描述这些宏。

使用FD_ZERO宏

这个C宏用来初始化一个文件描述符集合。在我们注册文件描述符之前(包括套接口),我们必须将其全部初始化为0。要初始化一个名为read_socks和write_socks的文件描述符集合,我们可以用下面的C语句:
fd_set read_socks;
fd_set write_socks;
FD_ZERO(&read_socks);
FD_ZERO(&write_socks);

前两条语句声明了与文件描述符集合相关的存储区。后两条语句使用FD_ZERO宏来将其初始化为空集合。换句话说,在执行FD_ZERO之后,在这个集合中并没有注册的文件描述符。

使用FD_SET宏

在我们使用了FD_ZERO宏初始化了一个文件描述符集合之后,我们需要做的下一件事情就是要在其中注册一些文件描述符。这可以用FD_SET宏来完成。下面的例子显示了一个套接口c是如何在名为read_socks集合中进行注册的:
int c; /* Client socket */
fd_set read_socks; /* Read set */
...
FD_SET(c,&read_socks);

在调用了FD_SET之后,就在引用的集合中注册了与文件描述符相应的位。

使用FD_CLR宏

这个C宏撤销FD_SET宏的操作。再一次假设套接口c,如果调用程序希望从这个集合中移除这个描述符,我们可以执行下面的代码:
int c; /* Client socket */
fd_set read_socks; /* Read set */
...
FD_CLR(c,&read_socks);

FD_CLR宏有清除文件描述相应位的功能。注意,这个宏与FD_ZERO不同,因为他只是清除集合中一个指定的文件描述符。而FD_ZERO宏是将集合中的所有位清零。

使用FD_ISSET宏测试文件描述

有时需要进行测试来查看一个指定的文件描述符是否在这个集合中(也就是查看相应的位是否置为1)。要测试是否设置了套接口c,我们可以编写下面的代码:
int c; /* Client socket */
fd_set read_socks; /* Read set */
...
if ( FD_ISSET(c,&read_socks) ) {
/* Socket c is in the set */
...
} else {
/* Socket c is not in the set */
...
}

if语句调用FD_ISSET宏来查看套接口c是否存在于文件描述符集合read_socks中。如果测试返回真,那么在集合中存在与套接口c相应的位,然后执行第一个C代码块。否则,套接口c并不是集合的一部分,那么就要执行else语句块。

在服务器上应用select函数

前面的内容详细的描述了select函数。现在需要将这个函数应用在一个例子中。下面修改的RPN计算服务器的例子将会使用select函数只读取事件。这样的限制是为了保持程序例子的相对短小与易于理解。这个演示的限制将会在后面进行详细的讨论。

RPN服务器需要引擎模块rpneng.c做一些修改,这些修改反应在新的模块rpneng2.c中。在这里我们并没有列出全部的代码,而只是用diff显示了内容上的小改动:
$ diff -c rpneng.c rpneng2.c
*** rpneng.c Mon Sep 13 22:13:56 1999
--- rpneng2.c Wed Sep 15 21:55:20 1999
***************
*** 18,25 ****
* RPN Stack;
*/
#define MAX_STACK 32
! static mpz_t *stack[MAX_STACK];
! static int sp = 0;
/*
* Allocate a new mpz_t value:
--- 18,25 ----
* RPN Stack:
*/
#define MAX_STACK 32
! mpz_t **stack;
! int sp = 0;
/*
* Allocate a new mpz_t value:
***************
*** 45,51 ****
/*
* Free an allocated mpz_t value:
*/
! static void
rpn_free(mpz_t **v) {
mpz_clear(**v);
free(*v);
--- 45,51 ----
/*
* Free an allocated mpz_t value:
*/
! void
rpn_free(mpz_t **v) {
mpz_clear(**v);
free(*v);
$

rpneng2.c模块的主要改变在于RPN栈数组(变量stack)以及栈指针(变量sp)声明为外部变量。静态函数rpn_free同样做为变部函数。这允许程序可以由主程序源码模块中访问变量和函数。
/*rpnsrv2.c
*
* Example RPN Server
* using select(2):
*/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <sys/wait.h>
#include <gmp.h>

#ifndef SHUT_RDWR
#define SHUT_RDWR 3
#endif

extern int mkaddr(void *addr,
int *addr_len,
char *input_address,
char *protocol);

extern void rpn_process(FILE *tx,char *buf);

extern void rpn_free(mpz_t **v);

#define MAX_STACK 32
#define MAX_CLIENTS 64

/*
* Declared in rpneng2.c
*/
extern mpz_t **stack;
extern in sp;

/*
* Client context Info:
*/
typedef struct
{
mpz_t **stack; /* Stack Array */
int sp; /* Stack prt */
FILE *rx; /* Recv FILE */
FILE *tx; /* Xmit FILE */
}ClientInfo;

ClientInfo client[MAX_CLIENTS];

/*
* This function rports the error and
* exits back to the shell:
*/
static void bail (const char *on_what)
{
if(errno != 0)
{
fputs(strerror(errno),stderr);
fputs(": ",stderr);
}
fputs(on_what,stderr);
fputc('/n',stderr);
exit(1);
}

/*
* Process client c:
*/
static int process_client(int c)
{
char buf[4096]; /* I/O Buffer */
FILE *rx = client[c].rx;
FILE *tx = client[c].tx;

/*
* Install correct RPN stack:
*/
stack = client[c].stack;
sp = client[c].sp;

/*
* If not EOR,process on lien:
*/
if(!feof(rx) && fgets(buf,sizeof buf,rx))
rpn_process(tx,buf);

if(!feof(rx))
{
/*
* Save SP and exit
*/
client[c].sp = sp;
return 0;
}

/*
* Close this clients connection:
*/
fclose(tx);
shutdown(fileno(rx),SHUT_RDWR);
fclose(rx);

client[c].rx = client[c].tx = NULL;

while(sp > 0)
rpn_free(&stack[--sp]);
free(stack);

client[c].stack = NULL;
client[c].sp = 0;

return EOF;
}

/*
* Mail Program:
*/
int main(int argc,char **argv)
{
int z;
char *srvr_addr = "127.0.0.1:9090";
struct sockaddr_in adr_srvr; /* AF_INET */
struct sockaddr_in adr_clnt; /* AF_INET */
int len_inet; /* length */
int s = -1; /* Socket */
int c = -1; /* Client Socket */
int n; /* return val from select */
int mx; /* max fd+1 */
fd_set rx_set; /* Read set */
fd_set wk_set; /* working set */
struct timeval tv; /* timeout value */

/*
* Initialize client structure:
*/
for (z=0;z<MAX_CLIENTS;++z)
{
client[z].stack = NULL;
client[z].sp = 0;
client[z].rx = NULL;
client[z].tx = NULL;
}

/*
* use a server address from the command
* line,otherwise default to 127.0.0.1:
*/
if(argc>=2)
srvr_addr = argv[1];

len_inet = sizeof adr_srvr;
z = mkaddr(&adr_srvr,&len_inet,srvr_addr,"tcp");

if(z < 0 || !adr_srvr.sin_port)
{
fprintf(stderr,"Invalid server"
"address, or no port number "
"was specified./n");
exit(1);
}

/*
* Create a TCP/IP socket to use:
*/
s = socket(PF_INET,SOCK_STREAM,0);
if(s==-1)
bail("socket()");

/*
* Bind the server address:
*/
z = bind(s,(struct sockaddr *)&adr_srvr,len_inet);
if(z==-1)
bail("bind()");

/*
* make it a listening socket:
*/
z = listen(s,10);
if(z == -1)
bail("listen()");

/*
* Express interest in socket
* s for read events:
*/
FD_ZERO(&rx_set); /* init */
FD_SET(s,&rx_set); /* + s */
mx = s+1; /* max fd+1 */

/*
* Start the server loop
*/
for(;;)
{
/*
* copy the rx_set to wk_set:
*/
FD_ZERO(&wk_set);
for(z=0;z<mx;++z)
{
if(FD_ISSET(z,&rx_set))
FD_SET(z,&wk_set);
}

/*
* Sample timeout of 2.03 secs:
*/
tv.tv_sec = 2;
tv.tv_usec = 30000;

n = select(mx,&wk_set,NULL,NULL,&tv);

if(n==-1)
{
fprintf(stderr,"%s:select(2)/n",
strerror(errno));
exit(1);
}
else if(!n)
{
puts("Timeout");
continue;
}

/*
* Check if a connect has occured:
*/
if(FD_ISSET(s,&wk_set))
{
/*
* Wait for a connect:
*/
len_inet = sizeof adr_clnt;
c = accept(s,(struct sockaddr *)&adr_clnt,&len_inet);
if(c==-1)
bail("accept(2)");

/*
* See if we've exceeded server
* capacity.if so,close the
* socket and wait for the
* next event:
*/
if(c>=MAX_CLIENTS)
{
close(c);
continue;
}

/*
* Create streams:
*/
client[c].rx = fdopen(c,"r");
if(!client[c].rx)
{
close(c);
continue;
}

client[c].tx = fdopen(dup(c),"w");
if(!client[c].tx)
{
fclose(client[c].rx);
continue;
}

if(c+1 >mx)
mx = c+1;

/*
* set both streams to line
* buffered mode:
*/
setlinebuf(client[c].rx);
setlinebuf(client[c].tx);

/*
* Allocate a stack:
*/
client[c].sp = 0;
client[c].stack = (mpz_t **)malloc(sizeof (mpz_t *) *MAX_STACK);
FD_SET(c,&rx_set);
}

/*
* Check for client activity:
*/
for(c = 0;c<mx;++c)
{
if(c==s)
continue;
if(FD_ISSET(c,&wk_set))
{
if(process_client(c) == EOF)
{
FD_CLR(c,&rx_set);
}
}
}

/*
* Reduce mx if we are able to :
*/
for(c=mx-1;
c >= 0 && !FD_ISSET(c,&rx_set);
c = mx-1)
mx = c;
}

return 0;
}
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics