并发处理
后台处理
<?php
namespace addons\mydemo\controller;
use think\Db;
use think\Exception;
use addons\mydemo\library\CustomException;
use think\addons\Controller;
class Index extends Controller
{
public function test()
{
if($this->request->isPost()){
// 开始事务
Db::startTrans();
try {
// 使用FOR UPDATE锁定查询,防止并发
$exists = \addons\mydemo\model\Test::where('user_id', $user_id)
->whereTime('createtime', 'today')
->lock(true)
->find();
if ($exists) {
Db::rollback();
throw new CustomException('请勿重复写入!');
}
// 创建记录
\addons\mydemo\model\Test::create(['user_id' => $user_id, 'createtime' => time()]);
// 提交事务
Db::commit();
} catch (CustomException $e) {
Db::rollback();
$this->error($e->getMessage());
} catch (Exception $e) {
Db::rollback();
$this->error('操作失败,请稍后重试');
}
$this->success('操作成功');
}
}
}CustomException.php代码请点击查看详情
查看详情
<?php
namespace addons\mydemo\library;
class CustomException extends \Exception
{
}并发测试
如需进行并发测试,请使用以下python运行脚本进行测试。将以下代码保存为test.py
并修改相应配置,然后使用python test.py测试。
test.py代码请点击查看详情
查看详情
import time
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
# === 配置参数 ===
URL = "http://www.example.com/addons/mydemo/index/test" # 替换为你的测试地址
TOTAL_REQUESTS = 10 # 总请求数
CONCURRENCY_LEVEL = 5 # 并发数(同时执行的线程数)
# 定义一个Token数组字典
tokenArr = [
"3cf118b3-207f-44d1-8197-a22ade1c1db3",
"c060e2fe-5e13-4b8c-9f84-24f2cfb56700",
]
def send_request(i, data):
"""发送单个HTTP请求并返回耗时"""
try:
start_time = time.time()
response = requests.post(URL, data=data) # 使用动态传入的参数
print(response.text)
elapsed = time.time() - start_time
return {
'id': i,
'status_code': response.status_code,
'time': elapsed
}
except Exception as e:
return {
'id': i,
'error': str(e)
}
def run_concurrent_test():
results = []
with ThreadPoolExecutor(max_workers=CONCURRENCY_LEVEL) as executor:
# 生成不同的参数集合,并判断 tokenArr[i] 是否存在
params_list = [
{"id": "2", "token": tokenArr[i] if i < len(tokenArr) else ""} for i in range(TOTAL_REQUESTS)
]
futures = [executor.submit(send_request, i, params_list[i]) for i in range(TOTAL_REQUESTS)]
for future in as_completed(futures):
result = future.result()
results.append(result)
print(f"Request {result['id']} completed: {result.get('status_code', 'ERROR')} | Time: {result['time']:.4f}s")
return results
def analyze_results(results):
times = [r['time'] for r in results if 'time' in r]
success_count = len(times)
total_time = sum(times)
avg_time = total_time / success_count if success_count else 0
max_time = max(times) if times else 0
min_time = min(times) if times else 0
print("\n=== 测试结果分析 ===")
print(f"总请求数: {len(results)}")
print(f"成功请求数: {success_count}")
print(f"平均响应时间: {avg_time:.4f} 秒")
print(f"最快响应时间: {min_time:.4f} 秒")
print(f"最慢响应时间: {max_time:.4f} 秒")
if __name__ == "__main__":
start = time.time()
results = run_concurrent_test()
duration = time.time() - start
print(f"总耗时: {duration:.2f} 秒")
analyze_results(results)如需测试多个不同用户同时并发,请将tokenArr中的值修改为对应用户的token,params_list为POST请求的参数。
常见问题
1、MySQL中的数据表只有Innodb才支持事务。
文档最后更新时间:2025-07-10 11:58:26
著作权归应用插件开发者所有,未经许可,禁止转载、复制此文档的任何内容。
未解决你的问题?请到「问答社区」反馈你遇到的问题