## ----setup, include = FALSE---------------------------------------------------
knitr::opts_chunk$set(
  collapse = TRUE,
  comment = "#>",
  message = FALSE,
  warning = FALSE
)
has_shard <- requireNamespace("shard", quietly = TRUE)
has_hdf5 <- requireNamespace("hdf5r", quietly = TRUE)

## ----load-delarr--------------------------------------------------------------
library(delarr)

## ----make-example-matrix------------------------------------------------------
set.seed(11)
mat <- matrix(
  rnorm(96),
  nrow = 12,
  ncol = 8,
  dimnames = list(paste0("sample_", 1:12), paste0("feature_", 1:8))
)

## ----inspect-plan-------------------------------------------------------------
pipe <- delarr(mat)[, -1] |>
  d_map(~ .x^2 + 1) |>
  d_where(function(x) x > 1.25, fill = 0)

plan <- explain(pipe, chunk_size = 3L)
plan

## ----check-plan, include = FALSE----------------------------------------------
stopifnot(
  identical(plan$output_dim, dim(pipe)),
  identical(plan$chunk_margin, "cols"),
  identical(plan$chunk_count, ceiling(ncol(pipe) / 3))
)

## ----adaptive-collect---------------------------------------------------------
adaptive_plan <- explain(pipe, target_bytes = 256)
adaptive_plan

adaptive_result <- collect(pipe, target_bytes = 256)
dim(adaptive_result)

## ----check-adaptive-collect, include = FALSE----------------------------------
fixed_result <- collect(pipe, chunk_size = 3L)
stopifnot(
  all(is.finite(adaptive_result)),
  isTRUE(all.equal(adaptive_result, fixed_result))
)

## ----multi-reduce-------------------------------------------------------------
row_summary <- d_reduce_many(
  delarr(mat),
  fns = list(sum = sum, mean = mean, max = max),
  dim = "rows",
  chunk_size = 3L
)

row_summary[1:4, , drop = FALSE]

## ----check-multi-reduce, include = FALSE--------------------------------------
stopifnot(
  is.matrix(row_summary),
  isTRUE(all.equal(row_summary[, "sum"], rowSums(mat))),
  isTRUE(all.equal(row_summary[, "mean"], rowMeans(mat))),
  isTRUE(all.equal(row_summary[, "max"], apply(mat, 1L, max)))
)

## ----block-apply--------------------------------------------------------------
col_blocks <- block_apply(
  delarr(mat),
  margin = "cols",
  size = 3L,
  fn = function(block) colMeans(block)
)

block_means <- unlist(col_blocks, use.names = FALSE)
block_means

## ----check-block-apply, include = FALSE---------------------------------------
stopifnot(
  all(is.finite(block_means)),
  isTRUE(all.equal(block_means, unname(colMeans(mat))))
)

## ----delayed-matmul-----------------------------------------------------------
rhs <- matrix(rnorm(30), nrow = 6, ncol = 5)
product_block <- d_matmul(delarr(mat[, 1:6, drop = FALSE]), delarr(rhs))[1:4, 1:3] |>
  collect(chunk_size = 2L)

product_block

## ----check-delayed-matmul, include = FALSE------------------------------------
expected_block <- (mat[, 1:6, drop = FALSE] %*% rhs)[1:4, 1:3, drop = FALSE]
stopifnot(
  all(is.finite(product_block)),
  isTRUE(all.equal(product_block, expected_block))
)

## ----prepare-scaled-hdf5, include = FALSE, eval = has_hdf5--------------------
tf_in <- tempfile(fileext = ".h5")
tf_out <- tempfile(fileext = ".h5")

write_hdf5(mat, tf_in, "X")

## ----stream-scaled-hdf5, eval = has_hdf5--------------------------------------
X <- delarr_hdf5(tf_in, "X")
scaled <- X |> d_scale(dim = "cols", center = TRUE, scale = TRUE)
writer <- hdf5_writer(tf_out, "X_scaled", ncol = ncol(X), chunk = c(6L, 4L))

collect(scaled, into = writer, chunk_size = 4L)

## ----inspect-scaled-hdf5, eval = has_hdf5-------------------------------------
disk_result <- read_hdf5(tf_out, "X_scaled")
rbind(
  mean = round(colMeans(disk_result), 6),
  sd = round(apply(disk_result, 2L, stats::sd), 6)
)

## ----check-scaled-hdf5, include = FALSE, eval = has_hdf5----------------------
centered <- sweep(mat, 2L, colMeans(mat), "-")
reference <- sweep(centered, 2L, apply(mat, 2L, stats::sd), "/")
stopifnot(
  all(is.finite(disk_result)),
  isTRUE(all.equal(unname(disk_result), unname(reference), tolerance = 1e-8)),
  all(abs(colMeans(disk_result)) < 1e-8),
  all(abs(apply(disk_result, 2L, stats::sd) - 1) < 1e-8)
)
unlink(c(tf_in, tf_out))

## ----shard-collect, eval = has_shard------------------------------------------
shard_result <- delarr_shard(mat) |>
  d_map(~ .x * 2) |>
  d_reduce(sum, dim = "rows") |>
  collect_shard(workers = 2L, chunk_size = 3L)

head(shard_result)

## ----check-shard-collect, include = FALSE, eval = has_shard-------------------
stopifnot(
  all(is.finite(shard_result)),
  isTRUE(all.equal(shard_result, rowSums(mat * 2)))
)

## ----profile-pipeline---------------------------------------------------------
profile <- profile_collect(pipe, reps = 2L, chunk_size = 3L)
profile

## ----check-profile-pipeline, include = FALSE----------------------------------
stopifnot(
  identical(profile$reps, 2L),
  all(is.finite(profile$elapsed)),
  profile$min_sec >= 0
)

