应对5000并发是一个复杂的系统工程问题,需要从多个层面进行优化。以下是Java和Python项目的详细优化方案:
1. 架构层面优化
负载均衡
# Nginx配置示例
upstream backend {
least_conn;
server 192.168.1.10:8080 max_fails=3 fail_timeout=30s;
server 192.168.1.11:8080 max_fails=3 fail_timeout=30s;
server 192.168.1.12:8080 max_fails=3 fail_timeout=30s;
}
server {
listen 80;
location / {
proxy_pass http://backend;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
}
微服务架构
// Spring Cloud Gateway路由配置
@Configuration
public class GatewayConfig {
@Bean
public RouteLocator customRouteLocator(RouteLocatorBuilder builder) {
return builder.routes()
.route("user-service", r -> r.path("/api/users/**")
.filters(f -> f.stripPrefix(1))
.uri("lb://user-service"))
.route("order-service", r -> r.path("/api/orders/**")
.filters(f -> f.stripPrefix(1))
.uri("lb://order-service"))
.build();
}
}
2. Java项目优化
线程池优化
@Configuration
public class ThreadPoolConfig {
@Bean("businessExecutor")
public ExecutorService businessExecutor() {
return new ThreadPoolExecutor(
200, // 核心线程数
400, // 最大线程数
60L, TimeUnit.SECONDS, // 空闲线程存活时间
new LinkedBlockingQueue<>(1000), // 队列大小
new ThreadFactoryBuilder().setNameFormat("biz-pool-%d").build(),
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
}
}
异步处理
@Service
public class UserService {
@Async("businessExecutor")
@Transactional
public CompletableFuture<User> createUser(User user) {
// 业务逻辑
User savedUser = userRepository.save(user);
// 异步发送通知
notificationService.sendWelcomeEmail(savedUser.getEmail());
return CompletableFuture.completedFuture(savedUser);
}
}
连接池优化
@Configuration
public class DataSourceConfig {
@Bean
@ConfigurationProperties(prefix = "spring.datasource.hikari")
public HikariDataSource dataSource() {
HikariConfig config = new HikariConfig();
config.setMaximumPoolSize(100);
config.setMinimumIdle(10);
config.setConnectionTimeout(30000);
config.setIdleTimeout(600000);
config.setMaxLifetime(1800000);
config.setLeakDetectionThreshold(60000);
return new HikariDataSource(config);
}
}
3. Python项目优化
使用异步框架
# FastAPI + Uvicorn 配置
from fastapi import FastAPI
import uvicorn
from starlette.middleware.cors import CORSMiddleware
app = FastAPI()
# CORS配置
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/users/{user_id}")
async def get_user(user_id: int):
# 异步数据库查询
user = await database.fetch_one(
"SELECT * FROM users WHERE id = $1", user_id
)
return user
if __name__ == "__main__":
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
workers=8, # CPU核心数的2倍
backlog=2048,
limit_concurrency=1000,
timeout_keep_alive=65
)
数据库连接池
# SQLAlchemy async配置
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/db"
engine = create_async_engine(
DATABASE_URL,
pool_size=20,
max_overflow=10,
pool_pre_ping=True,
pool_recycle=3600,
echo=False
)
AsyncSessionLocal = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)
4. 缓存策略
Redis缓存
@Component
public class CacheService {
@Autowired
private StringRedisTemplate redisTemplate;
@Cacheable(value = "users", key = "#userId", unless = "#result == null")
public User getUser(Long userId) {
return userRepository.findById(userId).orElse(null);
}
@CacheEvict(value = "users", key = "#userId")
public void updateUser(Long userId, User user) {
userRepository.save(user);
}
}
import aioredis
import json
from functools import wraps
class RedisCache:
def __init__(self, host='localhost', port=6379):
self.redis = aioredis.from_url(f"redis://{host}:{port}")
async def cache_get(self, key):
value = await self.redis.get(key)
return json.loads(value) if value else None
async def cache_set(self, key, value, expire=3600):
await self.redis.setex(key, expire, json.dumps(value))
def cached(ttl=3600):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
cache_key = f"{func.__name__}:{str(args)}:{str(kwargs)}"
result = await cache.cache_get(cache_key)
if result is None:
result = await func(*args, **kwargs)
await cache.cache_set(cache_key, result, ttl)
return result
return wrapper
return decorator
5. 性能监控
Java监控
@Component
public class PerformanceMonitor {
private final MeterRegistry meterRegistry;
public PerformanceMonitor(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}
@Around("@annotation(measured)")
public Object monitorExecutionTime(ProceedingJoinPoint joinPoint,
Measured measured) throws Throwable {
Timer.Sample sample = Timer.start(meterRegistry);
try {
Object result = joinPoint.proceed();
sample.stop(Timer.builder(joinPoint.getSignature().getName())
.register(meterRegistry));
return result;
} catch (Exception e) {
sample.stop(Timer.builder(joinPoint.getSignature().getName())
.tag("error", "true")
.register(meterRegistry));
throw e;
}
}
}
Python监控
import time
import asyncio
from prometheus_client import Counter, Histogram, start_http_server
REQUEST_COUNT = Counter('http_requests_total', 'Total HTTP Requests')
REQUEST_LATENCY = Histogram('request_latency_seconds', 'Request latency')
def monitor_request(func):
@wraps(func)
async def wrapper(*args, **kwargs):
REQUEST_COUNT.inc()
start_time = time.time()
try:
result = await func(*args, **kwargs)
REQUEST_LATENCY.observe(time.time() - start_time)
return result
except Exception as e:
REQUEST_LATENCY.observe(time.time() - start_time)
raise
return wrapper
# 启动监控服务器
start_http_server(8001)
6. JVM/Python优化参数
JVM优化
# JVM启动参数
-Xms4g -Xmx4g # 堆内存大小
-XX:MetaspaceSize=256m # 元空间
-XX:+UseG1GC # G1垃圾回收器
-XX:MaxGCPauseMillis=200 # 最大GC停顿时间
-XX:ParallelGCThreads=8 # 并行GC线程数
-XX:ConcGCThreads=4 # 并发GC线程数
-XX:+HeapDumpOnOutOfMemoryError # OOM时生成堆转储
Python优化
# Python环境变量
export PYTHONASYNCIODEBUG=0
export UVLOOP_NOINLINE=1
export PYTHONOPTIMIZE=2
7. 数据库优化
查询优化
-- 创建合适的索引
CREATE INDEX idx_users_status_created ON users(status, created_at DESC);
-- 分页优化
SELECT * FROM users
WHERE status = 'active'
ORDER BY created_at DESC
LIMIT 20 OFFSET 0;
-- 避免N+1查询
-- 使用JOIN或批量查询
8. 压力测试
JMeter测试脚本
<!-- jmeter_test_plan.jmx -->
<hashTree>
<ThreadGroup guiclass="ThreadGroupGui" testclass="ThreadGroup">
<stringProp name="ThreadGroup.num_threads">5000</stringProp>
<stringProp name="ThreadGroup.ramp_time">300</stringProp>
<boolProp name="ThreadGroup.scheduler">true</boolProp>
<stringProp name="ThreadGroup.duration">3600</stringProp>
</ThreadGroup>
</hashTree>
Locust测试
from locust import HttpUser, task, between
class WebsiteUser(HttpUser):
wait_time = between(1, 3)
@task
def get_users(self):
self.client.get("/api/users")
@task(3)
def create_user(self):
self.client.post("/api/users", json={
"name": "test",
"email": "test@example.com"
})
关键要点总结
- 水平扩展:使用负载均衡和集群部署
- 异步处理:减少线程阻塞,提高吞吐量
- 缓存策略:合理使用Redis等缓存中间件
- 连接池:优化数据库和HTTP连接池
- 监控告警:实时监控系统性能指标
- 压力测试:持续进行性能测试和优化
通过这些综合措施,可以有效支撑5000并发请求。实际应用中需要根据具体业务场景进行调整和优化。
云小栈