浅谈SWOOLE协程篇

阅读本文需要以下知识点

  • 了解进程、线程相关基础
  • 熟练php的hello world输出
  • 会swoole单词拼写

协程的介绍

协程是什么?

A coroutine is a function that can suspend its execution (yield) until the given given YieldInstruction finishes.

简单的说协程是寄宿在线程下程序员实现的一种跟更轻量的并发的协作轻量线程

随着程序员人群的增大,大佬也不断的爆发式增长,当然就开始有人觉得线程不好用了,那怎么办呢?当然是基于线程的理念上再去实现一套更加轻量、更好骗star的一套轻量线程(事实上协程不能完全被认为线程,因为一个线程可以有多个协程)

协程和线程的区别

本质

线程 内核态
协程 用户态

调度方式

线程的调度方式为系统调度,常用的调度策略有分时调度抢占调度。说白就是线程的调度完全不受自己控制

协程的调度方式为协作式调度 不受内核控制由自由策略调度切换

等等

协作式调度?

上述说了协程是用户态的,所以所谓的协作式调度直接可以理解为是程序员写的调度方式,也就是我想怎么调度就怎么调度,而不用通过系统内核被调度。

~~深。。。。~~浅入理解swoole的协程

既然打算浅入理解的swoole的协程,我们必须要知道swoole的协程模型。
swoole的协程是基于单线程。可以理解为协程的切换是串行的,再同一个时间点只运行一个协程.

说到这里,肯定就有人问了。go呢,go的协程的是基于多线程。当然各有各的好处,具体可以自行使用搜索引擎了解

我们可以直接copy & paste 下面代码,再本地的环境进行的 demo run

<?php

$func = function ($index, $isCorotunine = true) {
    $isCorotunine && \Swoole\Coroutine::sleep(2);
    echo "index:" . $index . ", value:" . (++$count) . PHP_EOL;
    echo "is corotunine:" . intval($isCorotunine) . PHP_EOL;
};

$func(1, false);
go($func, 2, true);
go($func, 3, true);
go($func, 4, true);
go($func, 5, true);
go($func, 6, true);
$func(7, false);    

会得到以下结果

index:1, value:1
is corotunine:0
index:7, value:2
is corotunine:0
index:2, value:3
is corotunine:1
index:6, value:4
is corotunine:1
index:5, value:5
is corotunine:1
index:4, value:6
is corotunine:1
index:3, value:7
is corotunine:1

肯定有人会想,哇塞,尽然1秒都执行完了,一点都不堵塞啊!!

好了,事实上关于1秒执行完的事情可以回过头再去看下协程的概念。
我们可以关注的是执行顺序,1和7是非协程的执行能立马返回结果符合预期。
关于协程的调度顺序
为什么是26543不是65432或者23456有序的返回呢

为了找到我们的答案,我们只能通过源码进行知晓一些东西

分析源码

image

图来自https://segmentfault.com/a/1190000019089997?utm_source=tag-newest

如果没有较强的基础还有啃烂的apcu的前提下(当然我也没有!T_T)
我们需要关心的是以下两个
yield 一个是协程的让出CPU
resume 恢复协程

我们可以自己思考下,如果让你设计协程,需要考虑哪些方面?

协程结构图

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-iLgbTXO9-1593482951922)(/img/bVbIP6V)]

协程的创建

<?php
go (function(){
echo "swoole 太棒了";
});

调用的swoole封装给PHPgo函数为创建一个协程

我们根据拓展源码中的

大部分的PHP扩展函数以及扩展方法的参数声明放在swoole_*.ccswoole.cc里面。

PHP_FALIAS(go, swoole_coroutine_create, arginfo_swoole_coroutine_create)

可以知道 go->swoole_coroutine_create

在swoole_coroutine.cc文件里找到

PHP_FUNCTION(swoole_coroutine_create)
{
    ....
    // 划重点 要考
    long cid = PHPCoroutine::create(&fci_cache, fci.param_count, fci.params);
    ....
}

long PHPCoroutine::create(zend_fcall_info_cache *fci_cache, uint32_t argc, zval *argv)
{
    if (sw_unlikely(Coroutine::count() >= config.max_num))
    {
        php_swoole_fatal_error(E_WARNING, "exceed max number of coroutine %zu", (uintmax_t) Coroutine::count());
        return SW_CORO_ERR_LIMIT;
    }

    if (sw_unlikely(!active))
    {
        // 划重点 要考
        activate();
    }

    // 保存回调函数
    php_coro_args php_coro_args;
    //函数信息
    php_coro_args.fci_cache = fci_cache;
    //参数
    php_coro_args.argv = argv;
    php_coro_args.argc = argc;
    // 划重点 要考
    save_task(get_task());

    // 划重点 要考
    return Coroutine::create(main_func, (void*) &php_coro_args);
}

// 保存栈 (入栈)
void PHPCoroutine::save_task(php_coro_task *task)
{
    save_vm_stack(task);
    save_og(task);
}
// 初始化reactor的事件
inline void PHPCoroutine::activate()
{
    if (sw_unlikely(active))
    {
        return;
    }

    /* init reactor and register event wait */
    php_swoole_check_reactor();

    /* replace interrupt function */
    orig_interrupt_function = zend_interrupt_function;
    zend_interrupt_function = coro_interrupt_function;
    
    /* replace the error function to save execute_data */
    orig_error_function = zend_error_cb;
    zend_error_cb = error;

    if (config.hook_flags)
    {
        enable_hook(config.hook_flags);
    }

    if (SWOOLE_G(enable_preemptive_scheduler) || config.enable_preemptive_scheduler)
    {
        /* create a thread to interrupt the coroutine that takes up too much time */
        interrupt_thread_start();
    }

    if (!coro_global_active)
    {
        if (zend_hash_str_find_ptr(&module_registry, ZEND_STRL("xdebug")))
        {
            php_swoole_fatal_error(E_WARNING, "Using Xdebug in coroutines is extremely dangerous, please notice that it may lead to coredump!");
        }

        /* replace functions that can not work correctly in coroutine */
        inject_function();

        coro_global_active = true;
    }
    /**
     * deactivate when reactor free.
     */
    swReactor_add_destroy_callback(SwooleG.main_reactor, deactivate, nullptr);
    active = true;
}

根据Coroutine::create继续往下跳转

    static inline long create(coroutine_func_t fn, void* args = nullptr)
    {
        return (new Coroutine(fn, args))->run();
    }

在创建完协程后立马执行
我们观察下构造方法

    Coroutine(coroutine_func_t fn, void *private_data) :
            ctx(stack_size, fn, private_data)
    {
        cid = ++last_cid;
        coroutines[cid] = this;
        if (sw_unlikely(count() > peak_num))
        {
            peak_num = count();
        }
    }

上述代码我可以发现还有一个Context的类 这个构造函数我们可以猜到做了3件事情

  1. 分配对应协程id (每个协程都有自己的id)
  2. 保存上下文
  3. 更新当前的协程的数量

swoole使用的协程库为 boost.context 可自行搜索
主要暴露的函数接口为jump_fcontextmake_fcontext
具体的作用保存当前执行状态的上下文暂停当前的执行状态够跳转到其他位置继续执行

执协程

inline long run()
    {
        long cid = this->cid;
        origin = current;
        current = this;
        // 依赖boost.context 切栈
        ctx.swap_in();
        // 判断是否执行结束
        check_end();
        return cid;
    }

判断是否结束

inline void check_end()
    {
        if (ctx.is_end())
        {
            close();
        }
        else if (sw_unlikely(on_bailout))
        {
            SW_ASSERT(current == nullptr);
            on_bailout();
            // expect that never here
            exit(1);
        }
    }

根据ctx.is_end()的函数找到

    inline bool is_end()
    {
        return end_;
    }
bool Context::swap_in()
{
    jump_fcontext(&swap_ctx_, ctx_, (intptr_t) this, true);
    return true;
}

我们可以总结下swoole在创建协程的时候主要做了哪些事情

  1. 检测环境
  2. 解析参数
  3. 保存上下文
  4. 切换C栈
  5. 执行协程

协程的yield

上述的demo我们使用\Swoole\Coroutine::sleep(2)
根据上述说函数申明的我们在swoole_corotunine_system.h发现对应的文件为swoole_coroutine_systemsleep的函数

PHP_METHOD(swoole_coroutine_system, sleep)
{
    double seconds;

    ZEND_PARSE_PARAMETERS_START(1, 1)
        Z_PARAM_DOUBLE(seconds)
    ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE);

    if (UNEXPECTED(seconds < SW_TIMER_MIN_SEC))
    {
        php_swoole_fatal_error(E_WARNING, "Timer must be greater than or equal to " ZEND_TOSTR(SW_TIMER_MIN_SEC));
        RETURN_FALSE;
    }
    System::sleep(seconds);
    RETURN_TRUE;
}

调用了sleep函数之后对当前的协程做了三件事
1.增加了timer定时器
2.注册回掉函数再延迟之后resume协程
3.通过yield让出调度

int System::sleep(double sec)
{
// 获取当前的协程
    Coroutine* co = Coroutine::get_current_safe();
   //swTimer_add 注册定时器 sleep_timeout回调的函数
   if (swTimer_add(&SwooleG.timer, (long) (sec * 1000), 0, co, sleep_timeout) == NULL)
    {
        return -1;
    }
    // 让出当前cpu
    co->yield();
    return 0;
}

// 回调函数
static void sleep_timeout(swTimer *timer, swTimer_node *tnode)
{
   // 恢复调度
    ((Coroutine *) tnode->data)->resume();
}

swTimer_node* swTimer_add(swTimer *timer, long _msec, int interval, void *data, swTimerCallback callback)
{
    ....
    // 保存当前上下文和对应过期时间
    tnode->data = data;
    tnode->type = SW_TIMER_TYPE_KERNEL;
    tnode->exec_msec = now_msec + _msec;
    tnode->interval = interval ? _msec : 0;
    tnode->removed = 0;
    tnode->callback = callback;
    tnode->round = timer->round;
    tnode->dtor = NULL;

    // _next_msec保存最快过期的事件
    if (timer->_next_msec < 0 || timer->_next_msec > _msec)
    {
        timer->set(timer, _msec);
        timer->_next_msec = _msec;
    }

    tnode->id = timer->_next_id++;
    if (sw_unlikely(tnode->id < 0))
    {
        tnode->id = 1;
        timer->_next_id = 2;
    }

    tnode->heap_node = swHeap_push(timer->heap, tnode->exec_msec, tnode);
    ....
    timer->num++;
    return tnode;
}

协程的切换

我们

void Coroutine::resume()
{
    SW_ASSERT(current != this);
    if (sw_unlikely(on_bailout))
    {
        return;
    }
    state = SW_CORO_RUNNING;
    if (sw_likely(on_resume))
    {
        on_resume(task);
    }
    // 将当前的协程保存为origin -> 理解程previous
    origin = current;
    // 需要执行的协程 变成 current
    current = this;
    // 入栈执行
    ctx.swap_in();
    check_end();
}

到这里时候 关于协程调用顺序的答案已经出来了

在创建协程的时候(new Coroutine(fn, args))->run();sleep触发yield都在不断变更的Corotuninecurrentorigin 再执切换的时候和php代码创建协程的时间发生穿插,而不是我们想象中的队列有序执行
比如当创建协程只有2个的时候

<?php

$func = function ($index, $isCorotunine = true) {
    $isCorotunine && \Swoole\Coroutine::sleep(2);
    echo "index:" . $index . PHP_EOL;
    echo "is corotunine:" . intval($isCorotunine) . PHP_EOL;
};

$func(1, false);

go($func, 2, true);
go($func, 3, true);

返回输出 因为连续创建协程的执行时间小没有被打乱

php swoole_go_demo1.php
index:1
is corotunine:0
index:2
is corotunine:1
index:3
is corotunine:1

当连续创建的时候200个协程的时候
返回就变得打乱的index 符合预计猜想

index:1,index:202,index:1,index:2,index:4,index:8,index:16,index:32,index:64,index:128,index:129,index:65,index:130,index:131,index:33,index:66,index:132,index:133,index:67,index:134,index:135,index:17,index:34,index:68,index:136,index:137,index:69,index:138,index:139,index:35,index:70,index:140,index:141,index:71,index:142,index:143,index:9,index:18,index:36,index:72,index:144,index:145,index:73,index:146,index:147,index:158,index:157,index:156,index:155,index:154,index:153,index:152,index:151,index:37,index:74,index:148,index:149,index:75,index:150,index:19,index:38,index:76,index:77,index:39,index:78,index:79,index:5,index:10,index:20,index:40,index:80,index:81,index:41,index:82,index:83,index:21,index:127,index:126,index:125,index:124,index:123,index:122,index:121,index:120,index:119,index:118,index:117,index:116,index:115,index:114,index:113,index:112,index:111,index:110,index:109,index:108,index:107,index:106,index:105,index:104,index:103,index:102,index:101,index:100,index:99,index:98,index:97,index:96,index:95,index:94,index:93,index:92,index:91,index:90,index:89,index:88,index:87,index:42,index:84,index:85,index:43,index:86,index:11,index:22,index:44,index:45,index:23,index:46,index:47,index:3,index:6,index:12,index:24,index:48,index:49,index:25,index:50,index:51,index:13,index:26,index:63,index:62,index:61,index:60,index:59,index:58,index:57,index:56,index:55,index:52,index:53,index:27,index:54,index:7,index:14,index:28,index:29,index:15,index:30,index:31,index:200,index:199,index:192,index:185,index:175,index:168,index:161,index:163,index:172,index:179,index:187,index:194,index:174,index:160,index:173,index:176,index:198,index:195,index:180,index:167,index:169,index:184,index:197,index:193,index:177,index:162,index:171,index:186,index:182,index:164,index:191,index:183,index:166,index:196,index:178,index:170,index:189,index:188,index:165,index:181,index:190,index:159

最后彩蛋

我们使用GO的协程的来实现上述的demo

package main

import (
	"fmt"
	"time"
)

var count int = 0

func main() {
	output(false, 1)

	go output(true, 2)
	go output(true, 3)
	go output(true, 4)
	go output(true, 5)
	go output(true, 6)

	output(false, 7)

	time.Sleep(time.Second)
}

func output(isCorotunine bool, index int) {
	time.Sleep(time.Second)
	count = count + 1
	fmt.Println(count, isCorotunine, index)
}

猜猜返回结果是如何的 可以根据go的协程基于多线程的方式再去研究下
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-p0GMooWU-1593482951925)(/img/bVbISI7)]

写给最后,文章纯属自己根据代码和资料理解,如果有错误麻烦提出来,倍感万分,如果因为一些错误的观点被误导我只能说


版权声明:本文为u012841175原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。