swoole的TCP服务器实现-swoole_server创建过程

这里只介绍源码实现,如果对于swoole的了解和使用,请参考官方网站:https://www.swoole.com/,这里感谢开源,有这些优秀的产品大家可以尽情的学习。要看swoole的实现,需要下载swoole的代码,下载地址为:https://gitee.com/swoole/swoole/tree/v4.0.3 下载解压后,可以导入到cdt或者source-insight去阅读,本系列文章基于swoole 4.0.3分享,我用的代码阅工具为eclise-cdt,我的代码放置路径为:E:\swoole-src-master

因swoole是PHP扩展,在看swoole代码时,会插入一些PHP扩展实现的API介绍,后面我会写一篇PHP扩展相关的文章,来总结下swoole里面用到的扩展API。

下载swoole代码后导入后的目录图如下(编辑器导入后,一屏幕截图不了,这里贴的是windows下展示)。

现在结合swoole官网提供出来的Demo做分享,下面代码就是Demo里TCP-Server的实例。

//创建Server对象,监听 127.0.0.1:9501端口
$serv = new swoole_server("127.0.0.1", 9501); 

//监听连接进入事件
$serv->on('connect', function ($serv, $fd) {  
    echo "Client: Connect.\n";
});

//监听数据接收事件
$serv->on('receive', function ($serv, $fd, $from_id, $data) {
    $serv->send($fd, "Server: ".$data);
});

//监听连接关闭事件
$serv->on('close', function ($serv, $fd) {
    echo "Client: Close.\n";
});

//启动服务器
$serv->start(); 

第一行代码,首先是定义swoole_server的对象,传入了两个参数,其中一个参数是服务器的IP地址,为字符串类型,另外一个是服务器的端口信息,为整型信息,而在官方文档中可以找到,swoole_server的完整的构造函数如下:

$serv = new swoole_server(string $host, int $port = 0, int $mode = SWOOLE_PROCESS,
    int $sock_type = SWOOLE_SOCK_TCP);

也就是mode和sock_type是有默认值的,mode代表server的进程模式,这里默认为多进程,sock_type为服务器类型,这里默认为TCP类型的,关于swoole_server的完整定义的说明可以参考官方文档:https://wiki.swoole.com/wiki/page/14.html

在swoole扩展代码中找到这个相关swoole_server的定义,这个还是很好找,具体路径为:E:\swoole-src-master\swoole_server.c,也就是在代码根目录下。因现在swoole_server是个对象,按PHP扩展的API,那在PHP扩展的实现中肯定是通过PHP_METHOD的方式去实现,且对应到现在的例子,完整的函数名定义为:

PHP_METHOD(swoole_server, __construct)

在swoole-server.c文件中,通过编辑器的搜索即可找到其完整的实现,我的编辑器中是从1910行开始,如下图所示:

该方法完整的代码如下:

PHP_METHOD(swoole_server, __construct)
{
    zend_size_t host_len = 0;//用于后续获取服务器IP地址信息时,存在IP地址长度信息
    char *serv_host;//获取服务器的IP地址信息,类型为字符串,构造PHP的swoole_server对象时传入,必传
    long sock_type = SW_SOCK_TCP;//server对应的Socket类型,构造PHP的swoole_server对象时传入,可选
    long serv_port = 0;//用于存放服务器端口信息,构造PHP的swoole_server对象时传入,必传
    long serv_mode = SW_MODE_PROCESS;//server的运行方式,构造PHP的swoole_server对象时传入,必传

    //only cli env,这里sapi_module是PHP内部实现的模块名,这里判断swoole_server只能运行在cli模式下,否则启动失败,sapi_module.name为PHP运行方式,也就是说swoole_server不能运行在fast-cgi模式下。
    if (strcasecmp("cli", sapi_module.name) != 0)
    {
        swoole_php_fatal_error(E_ERROR, "swoole_server only can be used in PHP CLI mode.");
        RETURN_FALSE;
    }

    //swoole的主事件模块已经启动,不运行重复启动,SwooleG这个模块的初始化和设置值,在后面的逻辑中,暂时先分析到这里,后面具体分析。
    if (SwooleG.main_reactor != NULL)
    {
        swoole_php_fatal_error(E_ERROR, "eventLoop has already been created. unable to create swoole_server.");
        RETURN_FALSE;
    }

    //一个PHP程序只能启动一个swoole_server,SwooleG这个模块的初始化和设置值,在后面的逻辑中,暂时先分析到这里,后面具体分析。
    if (SwooleG.serv != NULL)
    {
        swoole_php_fatal_error(E_WARNING, "server is running. unable to create swoole_server.");
        RETURN_FALSE;
    }

    //swServer是PHP的swoole_server对象的一个抽象,通过sw_malloc申请内存空间,其实sw_malloc的实现就是c的malloc申请空间的。
    swServer *serv = sw_malloc(sizeof (swServer));
    //swServer模块的初始化,请看下面单独的分析。
    swServer_init(serv);
    //PHP扩展的API,用于从PHP层获取输入参数信息,这里总共获取了serv_host,serv_port,serv_mode,sock_type信息
    if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s|lll", &serv_host, &host_len, &serv_port, &serv_mode, &sock_type) == FAILURE)
    {
        swoole_php_fatal_error(E_ERROR, "invalid swoole_server parameters.");
        return;
    }
//如果是windows环境下,serv_mode取值为SW_MODE_SINGLE(单进程)模式,具体的后续再分析
#ifdef __CYGWIN__
    serv_mode = SW_MODE_SINGLE;
#elif !defined(SW_USE_THREAD)//当前不支持多线程模式
    //如果server的模式为SW_MODE_THREAD 和 SW_MODE_BASE 模式,则调整为SW_MODE_SINGLE(单进程)模式
    if (serv_mode == SW_MODE_THREAD || serv_mode == SW_MODE_BASE)
    {
        serv_mode = SW_MODE_SINGLE;
        swoole_php_fatal_error(E_WARNING, "can't use multi-threading in PHP. reset server mode to be SWOOLE_MODE_BASE");
    }
#endif
    //serv用factory_mode属性保存serv_mode值
    serv->factory_mode = serv_mode;
    //如果server为单进程模式,则worker进程个数设置为1,且max_request为0,也就是从不退出,因为退出后,就没有worker进程服务了。
    if (serv->factory_mode == SW_MODE_SINGLE)
    {
        serv->worker_num = 1;//worker进程个数
        serv->max_request = 0;
    }
    //全局变量php_sw_server_callbacks的初始,这个表示回调函数指针
    bzero(php_sw_server_callbacks, sizeof (zval*) * PHP_SERVER_CALLBACK_NUM);
    
    if (serv_port == 0 && strcasecmp(serv_host, "SYSTEMD") == 0)//如果server对于的端口号为0,且主机名为SYSTEMD,这种场景比较少见
    {
        if (swserver_add_systemd_socket(serv) <= 0)//系统自动绑定端口号和主机名,这里不展开讨论,需要了解的请自行去了解
        {
            swoole_php_fatal_error(E_ERROR, "failed to add systemd socket.");
            return;
        }
    }
    else//大众化的场景,即指定了端口号和主机名信息
    {
        //创建server socket,并且bind端口号信息,后面专门展开分析
        swListenPort *port = swServer_add_port(serv, sock_type, serv_host, serv_port);
        if (!port)
        {
            zend_throw_exception_ex(swoole_exception_class_entry_ptr, errno TSRMLS_CC, "failed to listen server port[%s:%ld]. Error: %s[%d].",
                    serv_host, serv_port, strerror(errno), errno);
            return;
        }
    }
    //PHP扩展的API,getThis()用于获取PHP当前的对象,也就是swoole_server
    zval *server_object = getThis();

#ifdef HAVE_PCRE
    zval *connection_iterator_object;
    SW_MAKE_STD_ZVAL(connection_iterator_object);
    object_init_ex(connection_iterator_object, swoole_connection_iterator_class_entry_ptr);
    zend_update_property(swoole_server_class_entry_ptr, server_object, ZEND_STRL("connections"), connection_iterator_object TSRMLS_CC);

    swConnectionIterator *i = emalloc(sizeof(swConnectionIterator));
    bzero(i, sizeof(swConnectionIterator));
    i->serv = serv;
    swoole_set_object(connection_iterator_object, i);
#endif
    //设置swoole_server对象的属性值
    zend_update_property_stringl(swoole_server_class_entry_ptr, server_object, ZEND_STRL("host"), serv_host, host_len TSRMLS_CC);
    //设置swoole_server对象的属性值
    zend_update_property_long(swoole_server_class_entry_ptr, server_object, ZEND_STRL("port"), (long) serv->listen_list->port TSRMLS_CC);
    //设置swoole_server对象的属性值
    zend_update_property_long(swoole_server_class_entry_ptr, server_object, ZEND_STRL("mode"), serv->factory_mode TSRMLS_CC);
    //设置swoole_server对象的属性值
    zend_update_property_long(swoole_server_class_entry_ptr, server_object, ZEND_STRL("type"), sock_type TSRMLS_CC);
    //缓存当前server对象,这里的key为swoole_server对象在PHP内部的索引值,而value为serv对象,这里可以最多缓存10000000个对象,缓存空间按2倍去逐步扩容
    swoole_set_object(server_object, serv);
  
    //下述是server监听多端口的逻辑,不做具体分析
    zval *ports;
    SW_ALLOC_INIT_ZVAL(ports);
    array_init(ports);
    server_port_list.zports = ports;

#ifdef HT_ALLOW_COW_VIOLATION
    HT_ALLOW_COW_VIOLATION(Z_ARRVAL_P(ports));
#endif

    swListenPort *ls;
    LL_FOREACH(serv->listen_list, ls)
    {
        php_swoole_server_add_port(serv, ls TSRMLS_CC);
    }

    //设置swoole_server的属性值
    zend_update_property(swoole_server_class_entry_ptr, server_object, ZEND_STRL("ports"), ports TSRMLS_CC);
//初始化swServer和设置默认值
void swServer_init(swServer *serv)
{
    //swoole的初始化,这里也有比较多的内容,单独在下面进行分析
    swoole_init();
    //swServer的空间初始化,初始化为0
    bzero(serv, sizeof(swServer));
    //初始化server的工作模式,这里默认为基础模式,关于这里的定义,后续专门分析
    serv->factory_mode = SW_MODE_BASE;
    //设置server的reactor个个数,个数取SW_REACTOR_NUM和SW_REACTOR_MAX_THREAD的最大值,目前代码中SW_REACTOR_NUM的定义为CPU的个数,SW_REACTOR_MAX_THREAD定义的值为8
    serv->reactor_num = SW_REACTOR_NUM > SW_REACTOR_MAX_THREAD ? SW_REACTOR_MAX_THREAD : SW_REACTOR_NUM;
    //server的调度模式,关于这里的定义,后续专门分析
    serv->dispatch_mode = SW_DISPATCH_FDMOD;
    //server的工作进程个数,默认取CPU的个数
    serv->worker_num = SW_CPU_NUM;
    //server最大可以打开的文件个数,这里应该理解为最大连接数,取SwooleG.max_sockets和SW_SESSION_LIST_SIZE的最大值,其中SW_SESSION_LIST_SIZE值为1024*1024,而max_sockets取的是系统的软限制
    serv->max_connection = SwooleG.max_sockets < SW_SESSION_LIST_SIZE ? SwooleG.max_sockets : SW_SESSION_LIST_SIZE;
    //设置worker进程的最大任务数,默认为0,一个worker进程在处理完超过此数值的任务后将自动退出,进程退出后会释放所有内存和资源,用于解决内存泄露的,这里初始化为0。
    serv->max_request = 0;
    //server的最大等待时间,设置为30s,这个只是一个常量,应该在很多地方会用到,用来设置超时时间
    serv->max_wait_time = SW_WORKER_MAX_WAIT_TIME;

    //http server的配置,后面用到了再看
    serv->http_parse_post = 1;
    //http server上传文件的目录,这里取的是/tmp目录
    serv->upload_tmp_dir = sw_strdup("/tmp");

    //server的心跳检测相关
    serv->heartbeat_idle_time = SW_HEARTBEAT_IDLE;//心跳存活最大时间
    serv->heartbeat_check_interval = SW_HEARTBEAT_CHECK;//心跳定时侦查时间
    //server的buffer大小,这里有进和出的buffer,具体用到了再看,SW_BUFFER_INPUT_SIZE和SW_BUFFER_OUTPUT_SIZE的值都为2M
    serv->buffer_input_size = SW_BUFFER_INPUT_SIZE;
    serv->buffer_output_size = SW_BUFFER_OUTPUT_SIZE;
    //设置task进程和worker进程的通信方式,这里取默认值为unix socket的方式
    serv->task_ipc_mode = SW_TASK_IPC_UNIXSOCK;

    //从内存池申请空间且初始化全局变量swServerStats
    serv->stats = SwooleG.memory_pool->alloc(SwooleG.memory_pool, sizeof(swServerStats));
    if (serv->stats == NULL)
    {
        swError("[Master] Fatal Error: failed to allocate memory for swServer->stats.");
    }
    //从内存池申请空间且初始化全局变量swServerGS
    serv->gs = SwooleG.memory_pool->alloc(SwooleG.memory_pool, sizeof(swServerGS));
    if (serv->gs == NULL)
    {
        swError("[Master] Fatal Error: failed to allocate memory for swServer->gs.");
    }
    //全局变量设置属性值serv,取已经初始化好的serv
    SwooleG.serv = serv;
}
//swoole的全局变量初始化
void swoole_init(void)
{
    //linux资源限制描述符,后面有用到时再分析
    struct rlimit rlmt;
    //如果SwooleG的running已经启动,在跳过,关于SwoolG的定义和初始化就在下面
    if (SwooleG.running)
    {
        return;
    }

    //全局变量SwooleG的初始化,bzero会将SwooleG的全部地址空间置0,关于SwooleG全局变量,后面再分析,这里只需要知道SwooleG为swoole_server的全局配置变量。
    bzero(&SwooleG, sizeof(SwooleG));
    //全局变量SwoolWG的初始化,这里将SwooleWG的全部地址空间置0,关于SwooleWG全局变量,后面再分析,这里只需要知道SwooleWG为swoole_worker的全局配置值
    bzero(&SwooleWG, sizeof(SwooleWG));
    //全局变量sw_error的初始化,这里将sw_error的全部地址空间置0,关于sw_error全局变量,后面再分析
    bzero(sw_error, SW_ERROR_MSG_SIZE);

    //标记服务为已启动
    SwooleG.running = 1;
    //标记服务协程相关,后面再分析
    SwooleG.enable_coroutine = 1;
    //全局变量sw_errno的初始化,这里sw_errno为int16_t类型,直接赋值0做初始化
    sw_errno = 0;

    //标记日志文件描述符,这里赋值为标准输出
    SwooleG.log_fd = STDOUT_FILENO;
    //标记cpu个数,通过linux的api获取
    SwooleG.cpu_num = sysconf(_SC_NPROCESSORS_ONLN);
    //标记内存页的大小,通过linux的api获取
    SwooleG.pagesize = getpagesize();
    //记录当前进程号
    SwooleG.pid = getpid();
    //记录socket通信的buffer的size大小,这里SW_SOCKET_BUFFER_SIZE定义为(8*1024*1024),也就是8M
    SwooleG.socket_buffer_size = SW_SOCKET_BUFFER_SIZE;

//通过是否是调试模式,设置日志级别
#ifdef SW_DEBUG
    SwooleG.log_level = 0;
#else
    SwooleG.log_level = SW_LOG_INFO;
#endif

    //初始化获取获取当前内核名称和其它信息
    uname(&SwooleG.uname);

    //初始化随机数种子信息
    srandom(time(NULL));

    //创建全局的内存池,swMemoryGlobal_new函数后续在分析,这个函数第一个参数SW_GLOBAL_MEMORY_PAGESIZE的值为2M,也就是一次性申请的内存池为2M大小,这里内存池是通过单链表的方式去管理,也就是存在多个2M大小的内存池
    SwooleG.memory_pool = swMemoryGlobal_new(SW_GLOBAL_MEMORY_PAGESIZE, 1);
    //内存池申请失败时,退出程序
    if (SwooleG.memory_pool == NULL)
    {
        printf("[Master] Fatal Error: global memory allocation failure.");
        exit(1);
    }
    //从已经申请到的内存池里面分配空间,分配空间大小为sizeof(SwooleGS_t),同时做初始化,空间初始化为全局变量SwoolGS,这里的实现后续单独分析
    SwooleGS = SwooleG.memory_pool->alloc(SwooleG.memory_pool, sizeof(SwooleGS_t));
    //内存池分配空间失败时,退出程序
    if (SwooleGS == NULL)
    {
        printf("[Master] Fatal Error: failed to allocate memory for SwooleGS.");
        exit(2);
    }

    //初始化全局变量SwooleGS的锁1,这里用到的是linux下的互斥锁,后续具体分析
    swMutex_create(&SwooleGS->lock, 1);
    //初始化全局变量SwooleGS的锁2,这里用到的是linux下的互斥锁,后续具体分析
    swMutex_create(&SwooleGS->lock_2, 1);
    //初始化全局变量SwooleG的锁,这里用到的是linux下的互斥锁,后续具体分析
    swMutex_create(&SwooleG.lock, 0);

    //获取系统设置的进程可以打开的文件数信息(通过参数RLIMIT_NOFILE控制),获取的信息写入rlmt中
    if (getrlimit(RLIMIT_NOFILE, &rlmt) < 0)//获取失败
    {
        //打印警告日志信息,这里不会做退出程序处理
        swWarn("getrlimit() failed. Error: %s[%d]", strerror(errno), errno);
        //将全局变量SwoolG的max_sockets,即可以打开的文件个数初始化为1024
        SwooleG.max_sockets = 1024;
    }
    else//获取成功
    {
        //全局变量SwoolG的max_sockets属性设置为系统进程可以打开的最大文件个数信息
        SwooleG.max_sockets = (uint32_t) rlmt.rlim_cur;
    }
    //全局变量SwoolTG的属性buffer_stack的初始化,这里初始化为8192个字节大小
    SwooleTG.buffer_stack = swString_new(8192);
    if (SwooleTG.buffer_stack == NULL)//初始化失败,退出程序
    {
        exit(3);
    }
    //如果全局变量SwooleG的task_tmpdir属性未设置,则对该属性做初始化
    if (!SwooleG.task_tmpdir)
    { 
        //调用封装过的strndup做字符串的拷贝,SW_TASK_TMP_FILE的值为/tmp/swoole.task.XXXXXX
        SwooleG.task_tmpdir = sw_strndup(SW_TASK_TMP_FILE, sizeof(SW_TASK_TMP_FILE));
        SwooleG.task_tmpdir_len = sizeof(SW_TASK_TMP_FILE);
    }
    //获取task_tmpdir的上级目录,也就是/tmp目录
    char *tmp_dir = swoole_dirname(SwooleG.task_tmpdir);
    //递归创建目录
    if (access(tmp_dir, R_OK) < 0 && swoole_mkdir_recursive(tmp_dir) < 0)
    {
        swWarn("create task tmp dir(%s) failed.", tmp_dir);
    }
    //tmp_dir字符串是通过strndup创建的,需要主动释放空间,否则有内存泄露的分析
    if (tmp_dir)
    {
        sw_free(tmp_dir);
    }

    //初始化后面用于进程间通信的信号fd,关于fd方式做进程间通信工具的,后面专门介绍。
#ifdef HAVE_SIGNALFD
    swSignalfd_init();
    SwooleG.use_signalfd = 1;
    SwooleG.enable_signalfd = 1;
#endif
    //如果系统存在时间fd,则用fd做通信工具
#ifdef HAVE_TIMERFD
    SwooleG.use_timerfd = 1;
#endif
    //初始化全局变量的属性use_timer_pipe
    SwooleG.use_timer_pipe = 1;
}

 

版权声明:本文为u013702678原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/u013702678/article/details/81149972