📦 新手导读:什么时候需要批量MR?
当你面对数百到上千个暴露(如代谢物、蛋白、菌群)时,手工逐个分析不可行,必须使用可追踪的批处理流程。
背景介绍
批量MR本质是“同一个结局 + 多个暴露”重复执行。核心难点不在算法,而在稳定性、日志追踪和结果汇总。
备注(统一三段)
适用人群:处理大规模暴露(代谢物/蛋白组)并希望自动化输出结果的人。
常见错误:全量直接并行开跑且不留错误日志,失败任务难以回溯复现。
论文写法:方法中应写明批次数、并行参数、失败重跑规则和多重校正方式。
适用场景
- 📦 暴露端是 1400 个代谢物 GWAS 文件(每个文件一个代谢物)
- 🎯 对同一个结局重复做 MR(如 CKD、CAD、T2D)
- 🧾 需要统一导出主结果、质控结果和失败任务日志
- ⚙️ 需要从串行过渡到并行,加速全量跑批
完整脚本(for循环1400模板 + 并行优化)
下面脚本可直接保存为 batch_mr_metabolites.R。通过 use_parallel 开关切换串行/并行。
library(data.table)
library(dplyr)
library(TwoSampleMR)
library(future)
library(furrr)
dir.create("./output", showWarnings = FALSE, recursive = TRUE)
# =========================
# 0) 输入文件
# =========================
metabolite_files <- list.files(
"./input/metabolites/",
pattern = "\\.tsv\\.gz$",
full.names = TRUE
)
metabolite_files <- metabolite_files[seq_len(min(1400, length(metabolite_files)))]
outcome_file <- "./input/outcome_standard.tsv.gz"
# TRUE: 并行;FALSE: 串行for循环
use_parallel <- TRUE
# =========================
# 1) 单个代谢物的MR函数
# =========================
run_one_mr <- function(file_i, outcome_file) {
trait_id <- tools::file_path_sans_ext(tools::file_path_sans_ext(basename(file_i)))
tryCatch({
# 1.1 暴露数据
exposure_dat <- read_exposure_data(
filename = file_i,
sep = "\t",
snp_col = "rsid",
beta_col = "beta",
se_col = "se",
effect_allele_col = "ea",
other_allele_col = "oa",
eaf_col = "eaf",
pval_col = "p",
samplesize_col = "n",
phenotype_col = "metabolite"
) %>%
filter(pval.exposure < 5e-8)
if (nrow(exposure_dat) == 0) stop("无显著工具变量(p<5e-8)")
# 1.2 LD clumping
exposure_dat <- clump_data(
exposure_dat,
clump_kb = 10000,
clump_r2 = 0.001,
clump_p1 = 5e-8,
clump_p2 = 1e-5
)
if (nrow(exposure_dat) < 3) stop("clump后IV数量不足3")
# 1.3 结局数据
outcome_dat <- read_outcome_data(
snps = exposure_dat$SNP,
filename = outcome_file,
sep = "\t",
snp_col = "rsid",
beta_col = "beta",
se_col = "se",
effect_allele_col = "ea",
other_allele_col = "oa",
eaf_col = "eaf",
pval_col = "p",
samplesize_col = "n"
)
# 1.4 协调 + MR
dat <- harmonise_data(exposure_dat, outcome_dat, action = 2) %>%
filter(mr_keep == TRUE)
if (nrow(dat) < 3) stop("协调后SNP不足3")
mr_res <- mr(
dat,
method_list = c("mr_ivw", "mr_weighted_median", "mr_egger_regression", "mr_raps")
) %>%
mutate(metabolite = trait_id)
het_res <- mr_heterogeneity(
dat,
method_list = c("mr_ivw", "mr_egger_regression")
) %>%
mutate(metabolite = trait_id, test = "heterogeneity")
pleio_res <- mr_pleiotropy_test(dat) %>%
mutate(metabolite = trait_id, test = "pleiotropy")
list(
ok = TRUE,
trait = trait_id,
mr = mr_res,
qc = bind_rows(het_res, pleio_res)
)
}, error = function(e) {
list(
ok = FALSE,
trait = trait_id,
error = e$message
)
})
}
# =========================
# 2) 批量执行(串行/并行)
# =========================
if (use_parallel) {
n_workers <- max(1, min(8, as.integer(availableCores()) - 1))
plan(multisession, workers = n_workers)
message("并行模式,workers = ", n_workers)
res_list <- future_map(
metabolite_files,
run_one_mr,
outcome_file = outcome_file,
.options = furrr_options(
seed = 20260305,
packages = c("data.table", "dplyr", "TwoSampleMR")
)
)
plan(sequential)
} else {
message("串行for循环模式")
res_list <- vector("list", length(metabolite_files))
for (i in seq_along(metabolite_files)) {
message(sprintf("[%d/%d] %s", i, length(metabolite_files), basename(metabolite_files[i])))
res_list[[i]] <- run_one_mr(metabolite_files[i], outcome_file)
}
}
# =========================
# 3) 结果汇总
# =========================
ok_list <- res_list[sapply(res_list, function(x) isTRUE(x$ok))]
fail_list <- res_list[!sapply(res_list, function(x) isTRUE(x$ok))]
mr_table <- bind_rows(lapply(ok_list, function(x) x$mr))
qc_table <- bind_rows(lapply(ok_list, function(x) x$qc))
err_table <- bind_rows(lapply(fail_list, function(x) {
data.frame(
metabolite = x$trait,
error = x$error,
stringsAsFactors = FALSE
)
}))
if (nrow(mr_table) > 0) {
mr_table <- mr_table %>%
group_by(method) %>%
mutate(p_adj_fdr = p.adjust(pval, method = "fdr")) %>%
ungroup()
}
# =========================
# 4) 导出
# =========================
fwrite(mr_table, "./output/batch_mr_results_1400.tsv", sep = "\t")
fwrite(qc_table, "./output/batch_qc_results_1400.tsv", sep = "\t")
fwrite(err_table, "./output/batch_error_log_1400.tsv", sep = "\t")
message("完成:成功 ", nrow(mr_table), " 条MR结果;失败 ", nrow(err_table), " 个代谢物任务。")并行提速实战建议
- 🚀 先用 20 个代谢物试跑,确认字段映射和 harmonise 方向无误,再上全量。
- 🚀 机器内存 ≤ 32GB 时,建议 workers = 4~6,避免频繁 swap。
- 🚀 结局文件尽量使用压缩 + 索引格式,减少重复读取开销。
- 🚀 对 1400 全量任务建议分批(如每批 200 个),失败批次单独重跑。
- 🚀 固定随机种子(seed)并保留错误日志,保证可复现性。