并发处理

后台处理

zZNtmjxVpIoXvUEQtl7EZD7NW1ElVIww9GcMIg==著作权归作者所有,未经许可,禁止转载、复制此文档的任何内容。
<?php
namespace addons\mydemo\controller;

use think\Db;
use think\Exception;
use addons\mydemo\library\CustomException;
use think\addons\Controller;

class Index extends Controller
{
    public function test()
    {
        if($this->request->isPost()){
                // 开始事务
                Db::startTrans();
                try {
                    // 使用FOR UPDATE锁定查询,防止并发
                    $exists = \addons\mydemo\model\Test::where('user_id', $user_id)
                        ->whereTime('createtime', 'today')
                        ->lock(true)
                        ->find();

                    if ($exists) {
                        Db::rollback();
                        throw new CustomException('请勿重复写入!');
                    }

                    // 创建记录
                    \addons\mydemo\model\Test::create(['user_id' => $user_id, 'createtime' => time()]);
                    // 提交事务
                    Db::commit();
                } catch (CustomException $e) {
                    Db::rollback();
                    $this->error($e->getMessage());
                } catch (Exception $e) {
                    Db::rollback();
                    $this->error('操作失败,请稍后重试');
                }
                $this->success('操作成功');
        }
    }
}
IVezTBRblydLUZWSqfu4olG5dNgZwyi9zWwmHw==著作权归作者所有,未经许可,禁止转载、复制此文档的任何内容。

CustomException.php代码请点击查看详情

pnwMBFmhSbdoVeXauFyF4FIGvrrnwz8J7983kA==著作权归作者所有,未经许可,禁止转载、复制此文档的任何内容。
查看详情
<?php

namespace addons\mydemo\library;

class CustomException extends \Exception
{

}
DxtSVuHYKCdfnrebXHTsiS3ROPePnLX4cy2Plw==著作权归作者所有,未经许可,禁止转载、复制此文档的任何内容。

并发测试

xOnbQekqScvrUKVpstXMe8EiJufqAJl0KgDTIg==著作权归作者所有,未经许可,禁止转载、复制此文档的任何内容。

如需进行并发测试,请使用以下python运行脚本进行测试。将以下代码保存为test.py
并修改相应配置,然后使用python test.py测试。

pwxXMzUQBWtKqGaCYg61LhwJBD9oMTq8atcG4w==著作权归作者所有,未经许可,禁止转载、复制此文档的任何内容。

test.py代码请点击查看详情

dvMwSzUolWJfbZOQSvm5aCwf8kfudb+Ou07pgw==著作权归作者所有,未经许可,禁止转载、复制此文档的任何内容。
查看详情
import time
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed

# === 配置参数 ===
URL = "http://www.example.com/addons/mydemo/index/test"  # 替换为你的测试地址
TOTAL_REQUESTS = 10         # 总请求数
CONCURRENCY_LEVEL = 5       # 并发数(同时执行的线程数)

# 定义一个Token数组字典
tokenArr = [
    "3cf118b3-207f-44d1-8197-a22ade1c1db3",
    "c060e2fe-5e13-4b8c-9f84-24f2cfb56700",
]
def send_request(i, data):
    """发送单个HTTP请求并返回耗时"""
    try:
        start_time = time.time()
        response = requests.post(URL, data=data)  # 使用动态传入的参数
        print(response.text)
        elapsed = time.time() - start_time
        return {
            'id': i,
            'status_code': response.status_code,
            'time': elapsed
        }
    except Exception as e:
        return {
            'id': i,
            'error': str(e)
        }

def run_concurrent_test():
    results = []
    with ThreadPoolExecutor(max_workers=CONCURRENCY_LEVEL) as executor:
        # 生成不同的参数集合,并判断 tokenArr[i] 是否存在
        params_list = [
            {"id": "2", "token": tokenArr[i] if i < len(tokenArr) else ""} for i in range(TOTAL_REQUESTS)
        ]
        
        futures = [executor.submit(send_request, i, params_list[i]) for i in range(TOTAL_REQUESTS)]

        for future in as_completed(futures):
            result = future.result()
            results.append(result)
            print(f"Request {result['id']} completed: {result.get('status_code', 'ERROR')} | Time: {result['time']:.4f}s")

    return results

def analyze_results(results):
    times = [r['time'] for r in results if 'time' in r]
    success_count = len(times)
    total_time = sum(times)
    avg_time = total_time / success_count if success_count else 0
    max_time = max(times) if times else 0
    min_time = min(times) if times else 0

    print("\n=== 测试结果分析 ===")
    print(f"总请求数: {len(results)}")
    print(f"成功请求数: {success_count}")
    print(f"平均响应时间: {avg_time:.4f} 秒")
    print(f"最快响应时间: {min_time:.4f} 秒")
    print(f"最慢响应时间: {max_time:.4f} 秒")

if __name__ == "__main__":
    start = time.time()
    results = run_concurrent_test()
    duration = time.time() - start
    print(f"总耗时: {duration:.2f} 秒")
    analyze_results(results)
AFemfYwODCqtjbQL6ZLsSY4wP7wvr6wLnnhD1A==著作权归作者所有,未经许可,禁止转载、复制此文档的任何内容。

如需测试多个不同用户同时并发,请将tokenArr中的值修改为对应用户的tokenparams_list为POST请求的参数。

NTUkfcotMyJBhZwne/soMumehWDYMM2iS29kww==著作权归作者所有,未经许可,禁止转载、复制此文档的任何内容。

常见问题

vJBwANqHYmcWjMGQdwxEbegnsGgk2Ev/HHO72Q==著作权归作者所有,未经许可,禁止转载、复制此文档的任何内容。

1、MySQL中的数据表只有Innodb才支持事务。

文档最后更新时间:2025-07-10 11:58:26
著作权归应用插件开发者所有,未经许可,禁止转载、复制此文档的任何内容。

文档
目录

深色
模式

切换
宽度