import psycopg2
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from datetime import timedelta
from scipy.stats import iqr
from ruptures import Pelt
import plotly.express as px
import shap
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score
# ======================
# 1. 数据库连接与数据提取
# ======================
def connect_mimic():
"""连接本地MIMIC-IV数据库"""
return psycopg2.connect(
dbname="mimic",
user="your_username",
password="your_password",
host="localhost",
port="5432"
)
def extract_sepsis_patients(conn):
"""提取脓毒症休克患者"""
query = """
WITH sepsis_patients AS (
SELECT subject_id, hadm_id, icustay_id
FROM diagnoses_icd
WHERE icd_code IN ('R6520','R6521','A403','A415')
UNION
SELECT subject_id, hadm_id, icustay_id
FROM sepsis3
WHERE sepsis3_type = 'septic_shock'
)
SELECT sp.*, s.suspicion_time
FROM sepsis_patients sp
JOIN sepsis3 s ON sp.icustay_id = s.icustay_id
"""
return pd.read_sql(query, conn)
def extract_phosphate_data(conn, icustay_ids):
"""提取磷酸盐实验室数据"""
query = f"""
WITH shock_times AS (
SELECT icustay_id, suspicion_time
FROM sepsis3
WHERE icustay_id IN ({','.join(map(str, icustay_ids))})
)
SELECT l.icustay_id,
l.charttime - s.suspicion_time AS relative_time,
l.valuenum AS phosphate
FROM labevents l
JOIN shock_times s ON l.icustay_id = s.icustay_id
WHERE l.itemid = 50822 -- 磷酸盐实验室项目ID
AND l.charttime BETWEEN s.suspicion_time - INTERVAL '72h'
AND s.suspicion_time + INTERVAL '72h'
"""
return pd.read_sql(query, conn)
# ======================
# 2. 特征工程模块
# ======================
class PhosphateFeatureEngineer:
def __init__(self, time_window=24):
self.time_window = timedelta(hours=time_window)
def compute_features(self, df):
"""计算磷酸盐动态特征"""
features = []
for patient_id, patient_data in df.groupby('icustay_id'):
patient_data = patient_data.sort_values('relative_time')
times = patient_data['relative_time'].dt.total_seconds() / 3600
values = patient_data['phosphate'].values
features.append({
'icustay_id': patient_id,
'mean_phosphate': np.mean(values),
'std_phosphate': np.std(values),
'iqr_phosphate': iqr(values),
'min_phosphate': np.min(values),
'max_phosphate': np.max(values),
'abnormal_low': (values < 0.8).mean(),
'abnormal_high': (values > 1.5).mean(),
'slope': self._calculate_slope(times, values),
'change_points': self._detect_change_points(values)
})
return pd.DataFrame(features)
def _calculate_slope(self, x, y):
if len(x) < 2:
return 0
return np.polyfit(x, y, 1)[0]
def _detect_change_points(self, values):
if len(values) < 5:
return 0
algo = Pelt(model="rbf").fit(values)
return len(algo.predict(pen=3)) - 1
class OrganFailureTracker:
def __init__(self, sofa_mapping):
self.sofa_mapping = sofa_mapping # 需要自定义SOFA评分映射
def track_failures(self, df):
"""跟踪多器官衰竭动态"""
failure_counts = []
for _, patient_data in df.groupby('icustay_id'):
patient_data = patient_data.sort_values('charttime')
patient_data['failure_count'] = (patient_data[['respiration', 'coagulation',
'liver', 'cardiovascular',
'cns', 'renal']] > 3).sum(axis=1)
failure_counts.append(patient_data[['relative_time', 'failure_count']])
return pd.concat(failure_counts)
# ======================
# 3. 机器学习模型
# ======================
class PhosphateTrajectoryModel(nn.Module):
def __init__(self):
super().__init__()
self.lstm = nn.LSTM(input_size=1, hidden_size=64, batch_first=True)
self.fc = nn.Sequential(
nn.Linear(64 + 9, 128), # 9个静态特征
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(128, 1)
)
def forward(self, time_series, static_features):
_, (hidden, _) = self.lstm(time_series.unsqueeze(-1))
combined = torch.cat([hidden.squeeze(0), static_features], dim=1)
return torch.sigmoid(self.fc(combined))
def train_model(model, X_time, X_static, y, epochs=50):
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
best_auc = 0
for epoch in range(epochs):
model.train()
for i in range(0, len(X_time), 32):
X_time_batch = X_time[i:i+32]
X_static_batch = X_static[i:i+32]
y_batch = y[i:i+32]
optimizer.zero_grad()
preds = model(X_time_batch, X_static_batch).squeeze()
loss = criterion(preds, y_batch)
loss.backward()
optimizer.step()
# 验证阶段
model.eval()
preds, labels = [], []
with torch.no_grad():
for i in range(0, len(X_time), 32):
X_time_batch = X_time[i:i+32]
X_static_batch = X_static[i:i+32]
y_batch = y[i:i+32]
preds.append(model(X_time_batch, X_static_batch).squeeze().cpu().numpy())
labels.append(y_batch.cpu().numpy())
auc = roc_auc_score(np.concatenate(labels), np.concatenate(preds))
if auc > best_auc:
best_auc = auc
torch.save(model.state_dict(), 'best_model.pth')
print(f"Epoch {epoch+1}, Val AUC: {auc:.4f}")
# ======================
# 4. 可视化分析
# ======================
def plot_organ_failure_heatmap(df):
pivot_df = df.pivot(index='icustay_id', columns='relative_time', values='failure_count')
fig = px.imshow(
pivot_df,
labels=dict(x="相对时间(小时)", y="患者ID", color="器官衰竭数量"),
title='多器官衰竭动态热力图'
)
fig.update_xaxes(dtick=6)
fig.write_image("organ_failure_heatmap.png")
def plot_feature_importance(model, X_static):
explainer = shap.DeepExplainer(model, X_static)
shap_values = explainer.shap_values(X_static)
shap.summary_plot(shap_values, X_static, show=False)
plt.gcf().set_size_inches(12, 8)
plt.title('特征重要性分析')
plt.savefig('feature_importance.png')
# ======================
# 5. 主执行流程
# ======================
if __name__ == "__main__":
# 配置SOFA评分映射(示例值,需根据实际情况调整)
SOFA_MAPPING = {
'respiration': {1: 0, 2: 1, 3: 2, 4: 3},
'coagulation': {np.nan: 0, 0: 0, 1: 1, 2: 2, 3: 3},
'liver': {np.nan: 0, 0: 0, 1: 1, 2: 2, 3: 3, 4: 4},
'cardiovascular': {0: 0, 1: 1, 2: 2, 3: 3, 4: 4},
'cns': {0: 0, 1: 1, 2: 2, 3: 3, 4: 4},
'renal': {0: 0, 1: 1, 2: 2, 3: 3, 4: 4}
}
# 1. 连接数据库
conn = connect_mimic()
# 2. 提取数据
patients = extract_sepsis_patients(conn)
phosphate_data = extract_phosphate_data(conn, patients['icustay_id'].unique())
# 3. 特征工程
engineer = PhosphateFeatureEngineer()
phosphate_features = engineer.compute_features(phosphate_data)
# 4. 器官衰竭跟踪(需要患者数据包含SOFA相关字段)
organ_tracker = OrganFailureTracker(SOFA_MAPPING)
failure_data = organ_tracker.track_failures(patients)
# 5. 合并数据集
full_data = pd.merge(phosphate_features, failure_data, on='icustay_id')
# 6. 准备机器学习数据
X_time = full_data['relative_time'].values.reshape(-1, 1, 1)
X_static = full_data[['mean_phosphate', 'std_phosphate', 'iqr_phosphate',
'abnormal_low', 'abnormal_high', 'slope',
'change_points', 'min_phosphate', 'max_phosphate']].values
y = (full_data['failure_count'] > 3).astype(int).values # 示例标签
# 7. 训练模型
model = PhosphateTrajectoryModel()
train_model(model, X_time, X_static, y)
# 8. 可视化分析
plot_organ_failure_heatmap(full_data)
plot_feature_importance(model, X_static)
print("分析流程执行完成!结果已保存至当前目录")
使用说明:
配置数据库连接参数(修改
connect_mimic()
中的用户名和密码)根据实际数据结构完善SOFA评分映射表(
SOFA_MAPPING
)安装依赖库:
bash
运行脚本将自动执行:
数据提取和预处理
磷酸盐动态特征计算
器官衰竭跟踪
深度学习模型训练
结果可视化保存
输出结果:
最佳模型权重:
best_model.pth
器官衰竭热力图:
organ_failure_heatmap.png
特征重要性图:
feature_importance.png
训练过程AUC指标输出
该整合版本实现了从原始数据到可视化结果的完整流水线,可通过调整SOFA映射表和特征工程参数适配具体研究需求。建议通过Jupyter Notebook分块调试后,再执行完整脚本。