Yii2 高并发异步日志实战指南 (Redis 方案)

supervisor redis yii2 Yii2 · best · 于 1天前 发布 · 16 次阅读

Yii2 高并发异步日志实战指南 (Redis 方案)

本文档提供一套生产级的 Yii2 异步日志解决方案。

在高并发场景下,简单的 Redis RPUSHBLPOP 并不足以应对生产环境的复杂性。我们需要考虑:

  1. 极速生产:使用批量推送减少网络 RTT。
  2. 降级策略:Redis 挂了怎么办?
  3. 高效消费:批量刷盘,减少磁盘 IOPS。
  4. 优雅退出:防止消费者被 kill 时丢日志。

1. 生产者端优化 (Robust RedisTarget)

我们实现一个支持降级批量推送RedisTarget

1.1 代码实现 (components/AsyncRedisTarget.php)

namespace app\components;

use Yii;
use yii\log\Target;
use yii\base\InvalidConfigException;
use yii\log\FileTarget;

class AsyncRedisTarget extends Target
{
    public $redis = 'redis';
    public $key = 'queue:app:logs';
    
    // 降级配置:当 Redis 不可用时,回退到文件日志
    public $fallbackLogFile = '@runtime/logs/async_fallback.log';

    public function export()
    {
        try {
            $this->pushToRedis();
        } catch (\Exception $e) {
            // Redis 故障,降级到本地文件,避免丢日志或卡死业务
            $this->fallbackToFile($e);
        }
    }

    protected function pushToRedis()
    {
        $redis = Yii::$app->get($this->redis);
        $payloads = [];

        foreach ($this->messages as $message) {
            $payloads[] = json_encode([
                'l' => $message[1], // level (简写省流量)
                'c' => $message[2], // category
                't' => $message[3], // time
                'm' => $message[0], // message
                'r' => Yii::$app->params['request_id'] ?? '', // request_id
            ], JSON_UNESCAPED_UNICODE);
        }

        if (empty($payloads)) {
            return;
        }

        // 核心优化:使用可变参数一次性 RPUSH,只有 1 次网络 RTT
        // 也就是 RPUSH key val1 val2 val3 ...
        $redis->rpush($this->key, ...$payloads);
    }

    protected function fallbackToFile($exception)
    {
        // 简单的兜底写入
        $text = date('Y-m-d H:i:s') . " [FALLBACK] Redis Error: " . $exception->getMessage() . "\n";
        foreach ($this->messages as $message) {
            $text .= date('Y-m-d H:i:s', $message[3]) . " " . $message[0] . "\n";
        }
        file_put_contents(Yii::getAlias($this->fallbackLogFile), $text, FILE_APPEND);
    }
}

1.2 Web 配置 (config/web.php)

'log' => [
    'flushInterval' => 100, // 内存积攒 100 条(或请求结束)才推一次 Redis
    'targets' => [
        [
            'class' => 'app\components\AsyncRedisTarget',
            'exportInterval' => 100, // 达到 100 条触发 export
            'levels' => ['info', 'warning', 'error'],
            'key' => 'queue:app:logs',
        ],
    ],
],

2. 消费者端优化 (Bulk Consumer)

我们需要一个既能利用 BLPOP 实时性,又能合并写入减少磁盘 I/O 的消费者。

2.1 架构策略

  • Buffer 机制:内存中开辟一个 buffer,积攒到 N 条或 T 秒(如 1000 条或 3 秒)强制刷盘。
  • PCNTL 信号处理:捕获 SIGTERM / SIGINT,在退出前强制把 Buffer 刷入磁盘。
  • 批量写入:使用 file_put_contents 一次性追加几 MB 数据,而不是一行行写。

2.2 代码实现 (commands/AsyncLogController.php)

namespace app\commands;

use Yii;
use yii\console\Controller;

class AsyncLogController extends Controller
{
    private $buffer = [];
    private $batchSize = 1000;    // 批量刷盘阈值 (条)
    private $flushInterval = 3;   // 批量刷盘时间间隔 (秒)
    private $lastFlushTime = 0;
    private $running = true;

    // 目标日志文件路径映射
    private $logFiles = [
        'payment' => '@runtime/logs/payment.log',
        'default' => '@runtime/logs/app.log',
    ];

    public function init()
    {
        parent::init();
        // 注册信号处理,优雅退出
        if (extension_loaded('pcntl')) {
            pcntl_signal(SIGTERM, [$this, 'signalHandler']);
            pcntl_signal(SIGINT, [$this, 'signalHandler']);
        }
    }

    public function signalHandler($signal)
    {
        echo "Signal received, flushing and exiting...\n";
        $this->running = false;
    }

    public function actionIndex()
    {
        echo "Async Log Consumer Started (PID: " . getmypid() . ")...\n";
        $redis = Yii::$app->redis;
        $this->lastFlushTime = time();

        while ($this->running) {
            // 信号分发
            if (extension_loaded('pcntl')) {
                pcntl_signal_dispatch();
            }

            // 1. 尝试获取日志 (带超时 1s,方便处理 buffer flush)
            try {
                $data = $redis->blpop('queue:app:logs', 1);
            } catch (\Exception $e) {
                echo "Redis Error: " . $e->getMessage() . "\n";
                sleep(1);
                continue;
            }

            // 2. 放入 Buffer
            if ($data) {
                $this->buffer[] = json_decode($data[1], true);
            }

            // 3. 检查是否需要刷盘
            if ($this->shouldFlush()) {
                $this->flush();
            }
        }
        
        // 退出前的最后一次刷盘
        $this->flush();
        echo "Bye.\n";
    }

    private function shouldFlush()
    {
        if (empty($this->buffer)) {
            return false;
        }
        // 规则:条数够了 OR 时间到了
        return count($this->buffer) >= $this->batchSize 
            || (time() - $this->lastFlushTime) >= $this->flushInterval;
    }

    private function flush()
    {
        if (empty($this->buffer)) return;

        // 按分类聚合内容 (减少文件打开次数)
        $fileContents = [];

        foreach ($this->buffer as $log) {
            $cat = $log['c'];
            // 简单的分类映射逻辑,可根据实际需求复杂化
            $targetAlias = isset($this->logFiles[$cat]) ? $this->logFiles[$cat] : $this->logFiles['default'];
            $file = Yii::getAlias($targetAlias);

            if (!isset($fileContents[$file])) {
                $fileContents[$file] = '';
            }

            // 格式化日志行
            $time = date('Y-m-d H:i:s', $log['t']);
            $fileContents[$file] .= "{$time} [{$log['l']}] [{$log['c']}] [{$log['r']}] {$log['m']}\n";
        }

        // 批量写入磁盘
        foreach ($fileContents as $file => $content) {
            // 使用文件锁,保证原子追加
            file_put_contents($file, $content, FILE_APPEND | LOCK_EX);
        }

        echo "Flushed " . count($this->buffer) . " logs to disk.\n";
        
        // 重置状态
        $this->buffer = [];
        $this->lastFlushTime = time();
    }
}

3. 部署与运维

3.1 Supervisor 配置

必须使用 Supervisor 守护消费者进程,并配置 stopasgroup 确保信号能正确传递。

[program:yii-async-log]
command=php /path/to/yii async-log/index
process_name=%(program_name)s_%(process_num)02d
numprocs=1
autostart=true
autorestart=true
; 确保发送 SIGTERM 给进程,让它有机会执行 flush
stopsignal=TERM
stopwaitsecs=10

3.2 监控

  • Redis 队列长度监控:如果 queue:app:logs 持续增长,说明消费者处理不过来或挂了。
  • Fallback 文件监控:如果 runtime/logs/async_fallback.log 有大小,说明 Redis 出了故障。

总结

通过这套方案,我们实现了:

  1. Web 端极速响应:仅需 1 次 Redis 网络 IO。
  2. 高可靠性:Redis 挂了自动降级写文件。
  3. 磁盘 I/O 优化:将数千次随机写合并为几次批量顺序写。
  4. 数据完整性:优雅退出机制保证不丢数据。

本文由 best 创作,采用 知识共享署名 3.0 中国大陆许可协议 进行许可。 可自由转载、引用,但需署名作者且注明文章出处。

共收到 0 条回复
没有找到数据。
添加回复 (需要登录)
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册