Laravel 大数据量分块处理

chunk() 的用法

有时,我们可能会一次性查出很大的结果集(上万条记录),这样就很可能导致内存溢出。

Laravel 框架的 chunk() 方法,可以将数据分块,每次只查询指定数量的数据块,交给回调函数处理。从而在处理大量数据集合时,能够有效减少内存消耗。

User::where('status', 0)->chunk(100, function ($users) {
    foreach ($users as $user) {
        $user->update(['name' => 'jack']);
    }
});

其实,chunk() 方法就是分页获取数据块。

chunk() 的注意事项

如果 chunk() 方法的回调函数,内部有自更新,影响到了原始查询条件的记录总数,就会出现遗漏数据的情况。

比如,将上面的

$user->update(['name' => 'jack']);

更改为

$user->update(['status' => 1]);

其实,原因很简单,因为 chunk() 每次分块(分页)都会对原始数据进行重新查询。如果原始数据变了,就可能会影响到分页的偏移。

错误示例:

// 获取待处理的记录
$res = DB::connection('mysql_pms')->table('pms_product_app_map as m')
    ->selectRaw('m.*')
    ->leftJoin('pms_product_push_record as r', function ($join) {
        $join->on('r.app_id', '=', 'm.app_id')
             ->on('r.product_code', '=', 'm.product_code');
    })
    ->whereNull('r.id')  // 过滤掉已经推送成功的商品
    ->orderBy('m.id', 'asc');

$count = $res->count();  // 记录总数
$count_success = [];  // 成功的记录

$res->chunk(100, function ($records) use(&$count_success) {
    foreach ($records as $record) {
        try {
            // todo

            // 推送成功
            // 记录推送成功的记录
            $data = [
                'product_id'   => $record->product_id,
                'updated_at'   => Carbon::now()->toDateTimeString()
            ];
            DB::connection('mysql_pms')->table('pms_product_push_record')->updateOrInsert(['app_id'=>$record->app_id, 'product_code'=>$record->product_code], $data);
            $count_success[] = $record->id;
        } catch (\Exception $e) {
            // todo
        }
    }
});

因此,如果 chunk() 方法的回调函数内部有自更新,影响了记录总数,就不要使用 chunk() 方法。

这时,推荐使用 cursor() 方法。

cursor() 的用法

cursor() 方法,使用游标迭代处理数据库记录,一次只执行单个查询,在处理大批量数据时,cursor() 方法可大幅减少内存的消耗。

foreach (Flight::where('foo', 'bar')->cursor() as $flight) {
    //
}

现在,我们使用 cursor() 游标重写上面的 chunk() 错误示例。

// 获取待处理的记录
$res = DB::connection('mysql_pms')->table('pms_product_app_map as m')
    ->selectRaw('m.*')
    ->leftJoin('pms_product_push_record as r', function ($join) {
        $join->on('r.app_id', '=', 'm.app_id')
             ->on('r.product_code', '=', 'm.product_code');
    })
    ->whereNull('r.id')  // 过滤掉已经推送成功的商品
    ->orderBy('m.id', 'asc');

$count = $res->count();  // 记录总数
$count_success = [];  // 成功的记录

foreach ($res->cursor() as $record) {
    try {
        // todo

        // 推送成功
        // 记录推送成功的记录
        $data = [
            'product_id'   => $record->product_id,
            'updated_at'   => Carbon::now()->toDateTimeString()
        ];
        DB::connection('mysql_pms')->table('pms_product_push_record')->updateOrInsert(['app_id'=>$record->app_id, 'product_code'=>$record->product_code], $data);
        $count_success[] = $record->id;
    } catch (\Exception $e) {
        // todo
    }
}

版权声明:本文为lamp_yang_3533原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。