Java自学者论坛

 找回密码
 立即注册

手机号码,快捷登录

恭喜Java自学者论坛(https://www.javazxz.com)已经为数万Java学习者服务超过8年了!积累会员资料超过10000G+
成为本站VIP会员,下载本站10000G+会员资源,会员资料板块,购买链接:点击进入购买VIP会员

JAVA高级面试进阶训练营视频教程

Java架构师系统进阶VIP课程

分布式高可用全栈开发微服务教程Go语言视频零基础入门到精通Java架构师3期(课件+源码)
Java开发全终端实战租房项目视频教程SpringBoot2.X入门到高级使用教程大数据培训第六期全套视频教程深度学习(CNN RNN GAN)算法原理Java亿级流量电商系统视频教程
互联网架构师视频教程年薪50万Spark2.0从入门到精通年薪50万!人工智能学习路线教程年薪50万大数据入门到精通学习路线年薪50万机器学习入门到精通教程
仿小米商城类app和小程序视频教程深度学习数据分析基础到实战最新黑马javaEE2.1就业课程从 0到JVM实战高手教程MySQL入门到精通教程
查看: 824|回复: 0

高并发简单解决方案————redis队列缓存+mysql 批量入库(ThinkPhP)

[复制链接]
  • TA的每日心情
    奋斗
    2024-4-6 11:05
  • 签到天数: 748 天

    [LV.9]以坛为家II

    2034

    主题

    2092

    帖子

    70万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    705612
    发表于 2021-6-30 16:18:40 | 显示全部楼层 |阅读模式

    问题分析

    • 问题一:要求日志最好入库;但是,直接入库mysql确实扛不住,批量入库没有问题,done。【批量入库和直接入库性能差异】
    • 问题二:批量入库就需要有高并发的消息队列,决定采用redis list 仿真实现,而且方便回滚。
    • 问题三:日志量毕竟大,保存最近30条足矣,决定用php写个离线统计和清理脚本。

    一、设计数据库表和存储

    • 考虑到log系统对数据库的性能更多一些,稳定性和安全性没有那么高,存储引擎自然是只支持select insert 没有索引的archive。如果确实有update需求,也可以采用myISAM。
    • 考虑到log是实时记录的所有数据,数量可能巨大,主键采用bigint,自增即可
    • 考虑到log系统以写为主,统计采用离线计算,字段均不要出现索引,因为一方面可能会影响插入数据效率,另外读时候会造成死锁,影响写数据。

    二、redis存储数据形成消息队列

    复制代码
     /** * 使用队列生成reids测试数据 * 成功:执行 RPUSH操作后,返回列表的长度:8 */ public function createRedisList($listKey = 'message01') { $redis = RedisInstance::MasterInstance(); $redis->select(1); $message = [ 'type' => 'say', 'userId' => $redis->incr('user_id'), 'userName' => 'Tinywan' . mt_rand(100, 9999), //是否正在录像 'userImage' => '/res/pub/user-default-w.png', //是否正在录像 'openId' => 'openId' . mt_rand(100000, 9999999999999999), 'roomId' => 'openId' . mt_rand(30, 50), 'createTime' => date('Y-m-d H:i:s', time()), 'content' => $redis->incr('content') //当前是否正在打流状态  ]; $rPushResul = $redis->rPush($listKey, json_encode($message)); //执行成功后返回当前列表的长度 9 return $rPushResul; }
    复制代码

    三、读取redis消息队列里面的数据,批量入库

    第一种思路:

    复制代码
     /** * 消息Redis方法保存到Mysql数据库 * @param string $liveKey */ public function RedisSaveToMysql($listKey = 'message01') { if (empty($listKey)) { $result = ["errcode" => 500, "errmsg" => "this parameter is empty!"]; exit(json_encode($result)); } $redis = RedisInstance::MasterInstance(); $redis->select(1); $redisInfo = $redis->lRange($listKey, 0, 5); $dataLength = $redis->lLen($listKey); $model = M("User"); while ($dataLength > 65970) { try { $model->startTrans(); $redis->watch($listKey); $arrList = []; foreach ($redisInfo as $key => $val) { $arrList[] = array( 'username' => json_decode($val, true)['userName'], 'logintime' => json_decode($val, true)['createTime'], 'description' => json_decode($val, true)['content'], 'pido' => json_decode($val, true)['content'] ); } $insertResult = $model->addAll($arrList); if (!$insertResult) { $model->rollback(); $result = array("errcode" => 500, "errmsg" => "Data Insert into Fail!", 'data' => 'dataLength:' . $dataLength); exit(json_encode($result)); } $model->commit(); $redis->lTrim($listKey, 6, -1); $redisInfo = $redis->lRange($listKey, 0, 5); $dataLength = $redis->lLen($listKey); } catch (Exception $e) { $model->rollback(); $result = array("errcode" => 500, "errmsg" => "Data Insert into Fail!"); exit(json_encode($result)); } } $result = array("errcode" => 200, "errmsg" => "Data Insert into Success!", 'data' => 'dataLength:' . $dataLength . 'liveKey:' . $listKey); exit(json_encode($result)); }
    复制代码

    第二种思路(供参考,非框架) 

    复制代码
    <?php
    $redis_xx = new Redis(); $redis_xx->connect('ip', port); $redis_xx->auth("password"); // 获取现有消息队列的长度 $count = 0; $max = $redis_xx->lLen("call_log"); // 获取消息队列的内容,拼接sql $insert_sql = "insert into fb_call_log (`interface_name`, `createtime`) values "; // 回滚数组 $roll_back_arr = array(); while ($count < $max) { $log_info = $redis_cq01->lPop("call_log"); $roll_back_arr = $log_info; if ($log_info == 'nil' || !isset($log_info)) { $insert_sql .= ";"; break; } // 切割出时间和info $log_info_arr = explode("%", $log_info); $insert_sql .= " ('" . $log_info_arr[0] . "','" . $log_info_arr[1] . "'),"; $count++; } // 判定存在数据,批量入库 if ($count != 0) { $link_2004 = mysql_connect('ip:port', 'user', 'password'); if (!$link_2004) { die("Could not connect:" . mysql_error()); } $crowd_db = mysql_select_db('fb_log', $link_2004); $insert_sql = rtrim($insert_sql, ",") . ";"; $res = mysql_query($insert_sql); // 输出入库log和入库结果; echo date("Y-m-d H:i:s") . "insert " . $count . " log info result:"; echo json_encode($res); echo "</br>\n"; // 数据库插入失败回滚 if (!$res) { foreach ($roll_back_arr as $k) { $redis_xx->rPush("call_log", $k); } } // 释放连接 mysql_free_result($res); mysql_close($link_2004); } $redis_cq01->close(); ?>
    复制代码

    四、获取Redis数据缓存数据

     

    复制代码
     /** * [0]检查当前Redis是否连接成功 * [1]获取数据,首先从Redis中去获取,没有的话再从数据库中去获取 */ public function findDataRedisOrMysql($listKey = 'message01') { //Check the current connection status 查看服务是否运行 if (RedisInstance::MasterInstance() != false) { $redis = RedisInstance::MasterInstance(); $redis->select(2); /** * 首先从Redis中去获取数据 * lRange 获取为空的话,则表示没有数据,否则返回一个非空数组 */ $redisData = $redis->lRange($listKey, 0, 9); $resultData = []; if (!empty($redisData)) { $resultData['status_code'] = 200; $resultData['msg'] = 'Data Source from Redis Cache'; foreach ($redisData as $key => $val) { $resultData['listData'][] = json_decode($val, true); } } else { $resultData['redis_msg'] = 'Redis is Expire'; $conditions = array('status' => ':status'); $mysqlData = M('User')->where($conditions)->bind(':status', 1, \PDO::PARAM_STR)->select(); if ($mysqlData) { $resultData['status_code'] = 200; $resultData['mysql_msg'] = 'Data Source from Mysql is Success'; $redis->select(2); foreach ($mysqlData as $key => $val) { $resultData['listData'][] = $val; //写入Redis作为缓存 $redis->rPush($listKey, json_encode($val)); } //同时设置一个过期时间 $redis->expire($listKey,30); } else { $resultData['status_code'] = 500; $resultData['mysql_msg'] = 'Data Source from Mysql is Fail'; } } } else { $resultData['redis_msg'] = 'Redis server went away'; $resultData['mysql_msg'] = 'Mysql Data2'; $conditions = array('status' => ':status'); $mysqlData = M('User')->where($conditions)->bind(':status', 1, \PDO::PARAM_STR)->select(); foreach ($mysqlData as $key => $val) { $resultData['listData'][] = $val; } } homePrint($resultData); }
    复制代码

     

    四、离线天级统计和清理数据脚本

    复制代码
    <?php
    /** * static log :每天离线统计代码日志和删除五天前的日志 * */ // 离线统计 $link_2004 = mysql_connect('ip:port', 'user', 'pwd'); if (!$link_2004) { die("Could not connect:" . mysql_error()); } $crowd_db = mysql_select_db('fb_log', $link_2004); // 统计昨天的数据 $day_time = date("Y-m-d", time() - 60 * 60 * 24 * 1); $static_sql = "get sql"; $res = mysql_query($static_sql, $link_2004); // 获取结果入库略 // 清理15天之前的数据 $before_15_day = date("Y-m-d", time() - 60 * 60 * 24 * 15); $delete_sql = "delete from xxx where createtime < '" . $before_15_day . "'"; try { $res = mysql_query($delete_sql); }catch(Exception $e){ echo json_encode($e)."\n"; echo "delete result:".json_encode($res)."\n"; } mysql_close($link_2004); ?>
    复制代码

    五:代码部署

    主要是部署,批量入库脚本的调用和天级统计脚本,crontab例行运行。

    # 批量入库脚本 */2 * * * * /home/cuihuan/xxx/lamp/php5/bin/php /home/cuihuan/xxx/batchLog.php >>/home/cuihuan/xxx/batchlog.log # 天级统计脚本 0 5 * * * /home/cuihuan/xxx/php5/bin/php /home/cuihuan/xxx/staticLog.php >>/home/cuihuan/xxx/staticLog.log

    总结:相对于其他复杂的方式处理高并发,这个解决方案简单有效:通过redis缓存抗压,mysql批量入库解决数据库瓶颈,离线计算解决统计数据,通过定期清理保证库的大小。

    哎...今天够累的,签到来了1...
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|手机版|小黑屋|Java自学者论坛 ( 声明:本站文章及资料整理自互联网,用于Java自学者交流学习使用,对资料版权不负任何法律责任,若有侵权请及时联系客服屏蔽删除 )

    GMT+8, 2024-5-2 16:26 , Processed in 0.072502 second(s), 29 queries .

    Powered by Discuz! X3.4

    Copyright © 2001-2021, Tencent Cloud.

    快速回复 返回顶部 返回列表