返回MR专题首页

批量代谢物分析教程

1400种代谢物一键批量MR:for循环模板 + 并行提速 + 错误日志追踪

📦 新手导读:什么时候需要批量MR?

当你面对数百到上千个暴露(如代谢物、蛋白、菌群)时,手工逐个分析不可行,必须使用可追踪的批处理流程。

背景介绍

批量MR本质是“同一个结局 + 多个暴露”重复执行。核心难点不在算法,而在稳定性、日志追踪和结果汇总。

备注(统一三段)

适用人群:处理大规模暴露(代谢物/蛋白组)并希望自动化输出结果的人。
常见错误:全量直接并行开跑且不留错误日志,失败任务难以回溯复现。
论文写法:方法中应写明批次数、并行参数、失败重跑规则和多重校正方式。

适用场景

完整脚本(for循环1400模板 + 并行优化)

下面脚本可直接保存为 batch_mr_metabolites.R。通过 use_parallel 开关切换串行/并行。

library(data.table)
library(dplyr)
library(TwoSampleMR)
library(future)
library(furrr)

dir.create("./output", showWarnings = FALSE, recursive = TRUE)

# =========================
# 0) 输入文件
# =========================
metabolite_files <- list.files(
 "./input/metabolites/",
 pattern = "\\.tsv\\.gz$",
 full.names = TRUE
)
metabolite_files <- metabolite_files[seq_len(min(1400, length(metabolite_files)))]
outcome_file <- "./input/outcome_standard.tsv.gz"

# TRUE: 并行;FALSE: 串行for循环
use_parallel <- TRUE

# =========================
# 1) 单个代谢物的MR函数
# =========================
run_one_mr <- function(file_i, outcome_file) {
 trait_id <- tools::file_path_sans_ext(tools::file_path_sans_ext(basename(file_i)))

 tryCatch({
  # 1.1 暴露数据
  exposure_dat <- read_exposure_data(
   filename = file_i,
   sep = "\t",
   snp_col = "rsid",
   beta_col = "beta",
   se_col = "se",
   effect_allele_col = "ea",
   other_allele_col = "oa",
   eaf_col = "eaf",
   pval_col = "p",
   samplesize_col = "n",
   phenotype_col = "metabolite"
  ) %>%
   filter(pval.exposure < 5e-8)

  if (nrow(exposure_dat) == 0) stop("无显著工具变量(p<5e-8)")

  # 1.2 LD clumping
  exposure_dat <- clump_data(
   exposure_dat,
   clump_kb = 10000,
   clump_r2 = 0.001,
   clump_p1 = 5e-8,
   clump_p2 = 1e-5
  )
  if (nrow(exposure_dat) < 3) stop("clump后IV数量不足3")

  # 1.3 结局数据
  outcome_dat <- read_outcome_data(
   snps = exposure_dat$SNP,
   filename = outcome_file,
   sep = "\t",
   snp_col = "rsid",
   beta_col = "beta",
   se_col = "se",
   effect_allele_col = "ea",
   other_allele_col = "oa",
   eaf_col = "eaf",
   pval_col = "p",
   samplesize_col = "n"
  )

  # 1.4 协调 + MR
  dat <- harmonise_data(exposure_dat, outcome_dat, action = 2) %>%
   filter(mr_keep == TRUE)
  if (nrow(dat) < 3) stop("协调后SNP不足3")

  mr_res <- mr(
   dat,
   method_list = c("mr_ivw", "mr_weighted_median", "mr_egger_regression", "mr_raps")
  ) %>%
   mutate(metabolite = trait_id)

  het_res <- mr_heterogeneity(
   dat,
   method_list = c("mr_ivw", "mr_egger_regression")
  ) %>%
   mutate(metabolite = trait_id, test = "heterogeneity")

  pleio_res <- mr_pleiotropy_test(dat) %>%
   mutate(metabolite = trait_id, test = "pleiotropy")

  list(
   ok = TRUE,
   trait = trait_id,
   mr = mr_res,
   qc = bind_rows(het_res, pleio_res)
  )

 }, error = function(e) {
  list(
   ok = FALSE,
   trait = trait_id,
   error = e$message
  )
 })
}

# =========================
# 2) 批量执行(串行/并行)
# =========================
if (use_parallel) {
 n_workers <- max(1, min(8, as.integer(availableCores()) - 1))
 plan(multisession, workers = n_workers)
 message("并行模式,workers = ", n_workers)

 res_list <- future_map(
  metabolite_files,
  run_one_mr,
  outcome_file = outcome_file,
  .options = furrr_options(
   seed = 20260305,
   packages = c("data.table", "dplyr", "TwoSampleMR")
  )
 )
 plan(sequential)
} else {
 message("串行for循环模式")
 res_list <- vector("list", length(metabolite_files))
 for (i in seq_along(metabolite_files)) {
  message(sprintf("[%d/%d] %s", i, length(metabolite_files), basename(metabolite_files[i])))
  res_list[[i]] <- run_one_mr(metabolite_files[i], outcome_file)
 }
}

# =========================
# 3) 结果汇总
# =========================
ok_list <- res_list[sapply(res_list, function(x) isTRUE(x$ok))]
fail_list <- res_list[!sapply(res_list, function(x) isTRUE(x$ok))]

mr_table <- bind_rows(lapply(ok_list, function(x) x$mr))
qc_table <- bind_rows(lapply(ok_list, function(x) x$qc))
err_table <- bind_rows(lapply(fail_list, function(x) {
 data.frame(
  metabolite = x$trait,
  error = x$error,
  stringsAsFactors = FALSE
 )
}))

if (nrow(mr_table) > 0) {
 mr_table <- mr_table %>%
  group_by(method) %>%
  mutate(p_adj_fdr = p.adjust(pval, method = "fdr")) %>%
  ungroup()
}

# =========================
# 4) 导出
# =========================
fwrite(mr_table, "./output/batch_mr_results_1400.tsv", sep = "\t")
fwrite(qc_table, "./output/batch_qc_results_1400.tsv", sep = "\t")
fwrite(err_table, "./output/batch_error_log_1400.tsv", sep = "\t")

message("完成:成功 ", nrow(mr_table), " 条MR结果;失败 ", nrow(err_table), " 个代谢物任务。")

并行提速实战建议