## ----setup, include = FALSE---------------------------------------------------
knitr::opts_chunk$set(
  collapse = TRUE,
  comment = "#>",
  message = FALSE,
  warning = FALSE
)
has_hdf5 <- requireNamespace("hdf5r", quietly = TRUE)

## ----load-delarr--------------------------------------------------------------
library(delarr)

## ----build-lazy-pipeline------------------------------------------------------
set.seed(1)
mat <- matrix(
  rnorm(24),
  nrow = 6,
  ncol = 4,
  dimnames = list(paste0("sample_", 1:6), paste0("feature_", 1:4))
)

lazy_mean <- delarr(mat) |>
  d_center(dim = "rows") |>
  d_map(~ .x * 0.5) |>
  d_reduce(mean, dim = "rows")

lazy_mean

## ----collect-lazy-pipeline----------------------------------------------------
row_summary <- collect(lazy_mean, chunk_size = 2L)
max(abs(row_summary))

## ----check-lazy-pipeline, include = FALSE-------------------------------------
stopifnot(
  all(is.finite(row_summary)),
  all(abs(row_summary) < 1e-10)
)

## ----broadcast-vectors--------------------------------------------------------
row_bias <- c(-1, 0, 1, 2, 3, 4)
col_scale <- c(1, 0.5, 2, 1.5)

broadcasted <- collect((delarr(mat) + row_bias) * col_scale, chunk_size = 2L)
broadcasted[1:3, , drop = FALSE]

## ----check-broadcast-vectors, include = FALSE---------------------------------
expected <- sweep(sweep(mat, 1L, row_bias, "+"), 2L, col_scale, "*")
stopifnot(isTRUE(all.equal(broadcasted, expected)))

## ----broadcast-square, eval = FALSE-------------------------------------------
# sq <- matrix(1:9, 3, 3)
# biased <- delarr(sq) + c(10, 20, 30)
# #> Warning: Ambiguous broadcast: a length-3 vector against a square 3x3
# #> matrix is interpreted as row-aligned (one value per row) ...
# collect(biased)

## ----broadcast-square-cols, eval = FALSE--------------------------------------
# collect(delarr(sq) + matrix(c(10, 20, 30), 3, 3, byrow = TRUE))

## ----prepare-hdf5-input, include = FALSE, eval = has_hdf5---------------------
tf_in <- tempfile(fileext = ".h5")
tf_out <- tempfile(fileext = ".h5")

input <- matrix(runif(30), 5, 6)
write_hdf5(input, tf_in, "X")

## ----stream-hdf5, eval = has_hdf5---------------------------------------------
X <- delarr_hdf5(tf_in, "X")
writer <- hdf5_writer(tf_out, "X_z", ncol = ncol(X), chunk = c(5L, 3L))

collect(X |> d_zscore(dim = "cols"), into = writer, chunk_size = 3L)

## ----inspect-hdf5-result, eval = has_hdf5-------------------------------------
z <- read_hdf5(tf_out, "X_z")
rbind(
  mean = round(colMeans(z), 6),
  sd = round(apply(z, 2L, stats::sd), 6)
)

## ----check-hdf5-result, include = FALSE, eval = has_hdf5----------------------
stopifnot(
  all(is.finite(z)),
  all(abs(colMeans(z)) < 1e-8),
  all(abs(apply(z, 2L, stats::sd) - 1) < 1e-8)
)
unlink(c(tf_in, tf_out))

## ----make-custom-source, include = FALSE--------------------------------------
source_mat <- matrix(
  seq_len(60),
  nrow = 10,
  ncol = 6,
  dimnames = list(paste0("row_", 1:10), paste0("col_", 1:6))
)

## ----custom-backend-----------------------------------------------------------
custom <- delarr_backend(
  nrow = nrow(source_mat),
  ncol = ncol(source_mat),
  pull = function(rows = NULL, cols = NULL) {
    if (is.null(rows)) rows <- seq_len(nrow(source_mat))
    if (is.null(cols)) cols <- seq_len(ncol(source_mat))
    source_mat[rows, cols, drop = FALSE]
  },
  dimnames = dimnames(source_mat)
)

custom_result <- custom[1:4, 2:5] |>
  d_map(~ .x^2) |>
  collect(chunk_size = 2L)

custom_result

## ----check-custom-backend, include = FALSE------------------------------------
stopifnot(isTRUE(all.equal(custom_result, source_mat[1:4, 2:5]^2)))

