## ----setup, include = FALSE---------------------------------------------------
knitr::opts_chunk$set(
  collapse = TRUE,
  comment  = "#>"
)

# Force English R messages while rendering, so the published vignette
# stays consistent regardless of the contributor's locale. Wrapped in
# tryCatch because some platforms reject "LC_MESSAGES".
invisible(tryCatch(
  Sys.setlocale("LC_MESSAGES", "C"),
  error = function(e) NULL,
  warning = function(w) NULL
))
Sys.setenv(LANGUAGE = "en")

## ----load---------------------------------------------------------------------
library(genproc)

## ----workspace----------------------------------------------------------------
src_dir <- file.path(tempdir(), "genproc-vignette-src")
dst_dir <- file.path(tempdir(), "genproc-vignette-dst")
dir.create(src_dir, showWarnings = FALSE, recursive = TRUE)
dir.create(dst_dir, showWarnings = FALSE, recursive = TRUE)

write.csv(head(iris),       file.path(src_dir, "a.csv"), row.names = FALSE)
write.csv(head(mtcars),     file.path(src_dir, "b.csv"), row.names = FALSE)
write.csv(head(airquality), file.path(src_dir, "c.csv"), row.names = FALSE)

convert <- function(src_dir, src_file, dst_dir, dst_file) {
  df <- read.csv(file.path(src_dir, src_file))
  saveRDS(df, file.path(dst_dir, dst_file))
}

mask <- data.frame(
  src_dir  = src_dir,
  src_file = c("a.csv", "b.csv", "c.csv"),
  dst_dir  = dst_dir,
  dst_file = c("a.rds", "b.rds", "c.rds"),
  stringsAsFactors = FALSE
)

result <- genproc(convert, mask)

## ----result-names-------------------------------------------------------------
class(result)
names(result)

## ----log-head-----------------------------------------------------------------
result$log

## ----repro-str----------------------------------------------------------------
str(result$reproducibility, max.level = 1)

## ----inputs-shape-------------------------------------------------------------
str(result$reproducibility$inputs, max.level = 1)

## ----inputs-auto--------------------------------------------------------------
mask_paths <- data.frame(
  csv_in = file.path(src_dir, c("a.csv", "b.csv", "c.csv")),
  stringsAsFactors = FALSE
)
do_one <- function(csv_in) nrow(read.csv(csv_in))

run0 <- genproc(do_one, mask_paths)
run0$reproducibility$inputs$files

## ----inputs-dedup-------------------------------------------------------------
config_path <- file.path(src_dir, "config.yml")
writeLines("threshold: 10", config_path)

mask_with_config <- data.frame(
  csv_in = file.path(src_dir, c("a.csv", "b.csv", "c.csv")),
  config = config_path,                       # same value across rows
  stringsAsFactors = FALSE
)
do_one_cfg <- function(csv_in, config) nrow(read.csv(csv_in))

run_shared <- genproc(do_one_cfg, mask_with_config)

# 4 rows: 3 unique csv_in + 1 config
nrow(run_shared$reproducibility$inputs$files)

# 6 rows: 3 cases x 2 input columns
nrow(run_shared$reproducibility$inputs$refs)

## ----inputs-diff--------------------------------------------------------------
# Rewrite a.csv with strictly more content (size changes)
write.csv(iris, file.path(src_dir, "a.csv"), row.names = FALSE)

run1 <- genproc(do_one, mask_paths)
diff_inputs(run0, run1)

## ----broken-------------------------------------------------------------------
file.remove(file.path(src_dir, "b.csv"))
result_broken <- genproc(convert, mask)

result_broken$n_success
result_broken$n_error

## ----broken-row---------------------------------------------------------------
bad <- errors(result_broken)
bad$error_message
cat(bad$traceback[1], "\n")

## ----restore------------------------------------------------------------------
write.csv(head(mtcars), file.path(src_dir, "b.csv"), row.names = FALSE)

## ----inspect-errors, eval = FALSE---------------------------------------------
# errors(result_broken)

## ----inspect-summary----------------------------------------------------------
summary(result_broken)

## ----inspect-rerun-failed, eval = FALSE---------------------------------------
# # Re-run with a hardened f that handles the missing file gracefully.
# result_fixed <- rerun_failed(
#   result_broken,
#   f = function(src_dir, src_file, dst_dir, dst_file) {
#     in_path <- file.path(src_dir, src_file)
#     if (!file.exists(in_path)) return(NA)
#     df <- read.csv(in_path)
#     saveRDS(df, file.path(dst_dir, dst_file))
#   }
# )

## ----inspect-rerun-affected, eval = FALSE-------------------------------------
# d <- diff_inputs(run0, run1)
# refreshed <- rerun_affected(run0, d, f = do_one)

## ----blocks-fef---------------------------------------------------------------
# An example that works for ONE specific case
input_path  <- file.path(src_dir, "a.csv")
output_path <- file.path(dst_dir, "a-from-example.rds")

example <- expression({
  df <- read.csv(input_path)
  saveRDS(df, output_path)
})

fn <- from_example_to_function(example)
formals(fn)

## ----blocks-ftm---------------------------------------------------------------
mask_template <- from_function_to_mask(fn)
mask_template

## ----blocks-rfp---------------------------------------------------------------
fn_named <- rename_function_params(
  fn, c(param_1 = "input_path", param_2 = "output_path")
)
formals(fn_named)

## ----blocks-full--------------------------------------------------------------
mask_built <- data.frame(
  input_path  = file.path(src_dir, c("a.csv", "b.csv", "c.csv")),
  output_path = file.path(dst_dir, c("a2.rds", "b2.rds", "c2.rds")),
  stringsAsFactors = FALSE
)
genproc(fn_named, mask_built)$n_success

## ----fmapping, eval = FALSE---------------------------------------------------
# # `f` uses generic names; the mask uses domain names.
# f <- function(input_dir, input_file, output_dir, output_file) {
#   df <- read.csv(file.path(input_dir, input_file))
#   saveRDS(df, file.path(output_dir, output_file))
# }
# 
# genproc(
#   f = f,
#   mask = mask,
#   f_mapping = c(
#     "input_dir"   = "src_dir",
#     "input_file"  = "src_file",
#     "output_dir"  = "dst_dir",
#     "output_file" = "dst_file"
#   )
# )

## ----parallel-plan, eval = FALSE----------------------------------------------
# future::plan(future::multisession, workers = 6)
# 
# result_1 <- genproc(convert, mask, parallel = parallel_spec())
# result_2 <- genproc(convert, mask, parallel = parallel_spec())
# # reuses the same workers

## ----parallel-oneoff, eval = FALSE--------------------------------------------
# genproc(convert, mask, parallel = parallel_spec(workers = 4))

## ----nb-example, eval = FALSE-------------------------------------------------
# job <- genproc(convert, mask, nonblocking = nonblocking_spec())
# status(job)         # "running", "done (not collected)", "done", or "error"
# job <- await(job)   # blocks until resolution
# job$log

## ----compose, eval = FALSE----------------------------------------------------
# job <- genproc(
#   convert, mask,
#   parallel    = parallel_spec(workers = 6),
#   nonblocking = nonblocking_spec()
# )
# # get control back immediately
# # ... do other work ...
# job <- await(job)

## ----progress-example, eval = FALSE-------------------------------------------
# library(progressr)
# 
# with_progress(
#   result <- genproc(my_fn, mask, parallel = parallel_spec(workers = 4))
# )

