The README introduces the five layers on a minimal
example. This vignette walks through the pieces a user needs once they
start depending on genproc for real work: the shape of the
result, how errors are reported, how the optional layers compose, and
what the current edges are.
The same synthetic file-conversion task as in the README — one row
per file, convert() is the per-case function.
src_dir <- file.path(tempdir(), "genproc-vignette-src")
dst_dir <- file.path(tempdir(), "genproc-vignette-dst")
dir.create(src_dir, showWarnings = FALSE, recursive = TRUE)
dir.create(dst_dir, showWarnings = FALSE, recursive = TRUE)
write.csv(head(iris), file.path(src_dir, "a.csv"), row.names = FALSE)
write.csv(head(mtcars), file.path(src_dir, "b.csv"), row.names = FALSE)
write.csv(head(airquality), file.path(src_dir, "c.csv"), row.names = FALSE)
convert <- function(src_dir, src_file, dst_dir, dst_file) {
df <- read.csv(file.path(src_dir, src_file))
saveRDS(df, file.path(dst_dir, dst_file))
}
mask <- data.frame(
src_dir = src_dir,
src_file = c("a.csv", "b.csv", "c.csv"),
dst_dir = dst_dir,
dst_file = c("a.rds", "b.rds", "c.rds"),
stringsAsFactors = FALSE
)
result <- genproc(convert, mask)genproc_resultresult is an S3 list with a stable, documented set of
fields:
class(result)
#> [1] "genproc_result"
names(result)
#> [1] "log" "reproducibility" "n_success"
#> [4] "n_error" "duration_total_secs" "status"log: one row per case. Columns, in order:
case_id, the mask parameter values (in the order they
appear in the mask), then success,
error_message, traceback,
duration_secs.reproducibility: the environment snapshot captured at
run start (see below).n_success, n_error: summary counts.duration_total_secs: total wall-clock time.status: "done" for a completed synchronous
run; for a non-blocking run, one of "running" (future not
resolved yet), "done (not collected)" (future resolved but
result not yet collected via await()), "done"
(collected), or "error".These fields are guaranteed stable across minor versions; new fields
may be added (e.g. worker_id for parallel runs), but
existing ones will never be removed or renamed.
Column order is designed for a human scanning a run:
result$log
#> case_id
#> 1 case_0001
#> 2 case_0002
#> 3 case_0003
#> src_dir
#> 1 C:\\Users\\rheri\\AppData\\Local\\Temp\\RtmpM78eGk/genproc-vignette-src
#> 2 C:\\Users\\rheri\\AppData\\Local\\Temp\\RtmpM78eGk/genproc-vignette-src
#> 3 C:\\Users\\rheri\\AppData\\Local\\Temp\\RtmpM78eGk/genproc-vignette-src
#> src_file
#> 1 a.csv
#> 2 b.csv
#> 3 c.csv
#> dst_dir
#> 1 C:\\Users\\rheri\\AppData\\Local\\Temp\\RtmpM78eGk/genproc-vignette-dst
#> 2 C:\\Users\\rheri\\AppData\\Local\\Temp\\RtmpM78eGk/genproc-vignette-dst
#> 3 C:\\Users\\rheri\\AppData\\Local\\Temp\\RtmpM78eGk/genproc-vignette-dst
#> dst_file success error_message traceback duration_secs
#> 1 a.rds TRUE <NA> <NA> 0.00
#> 2 b.rds TRUE <NA> <NA> 0.02
#> 3 c.rds TRUE <NA> <NA> 0.00case_id is stable and index-based
(case_0001, case_0002, …) for now. A
content-based variant is on the roadmap, for use cases where rows of the
mask can be reordered between runs.
str(result$reproducibility, max.level = 1)
#> List of 11
#> $ timestamp : POSIXct[1:1], format: "2026-05-07 21:14:19"
#> $ r_version : chr "R version 4.5.1 (2025-06-13 ucrt)"
#> $ platform : chr "x86_64-w64-mingw32"
#> $ os : chr "Windows 10 x64"
#> $ locale : chr "LC_COLLATE=C;LC_CTYPE=French_France.utf8;LC_MONETARY=French_France.utf8;LC_NUMERIC=C;LC_TIME=French_France.utf8"
#> $ timezone : chr "Europe/Paris"
#> $ packages : Named chr [1:29] "0.2.0" "0.6.39" "2.6.1" "1.2.0" ...
#> ..- attr(*, "names")= chr [1:29] "genproc" "digest" "R6" "fastmap" ...
#> $ mask_snapshot:'data.frame': 3 obs. of 4 variables:
#> $ parallel : NULL
#> $ nonblocking : NULL
#> $ inputs :List of 3timestamp: POSIXct, start time of the
run.r_version, platform, os,
locale, timezone: system info.packages: named character vector of package ->
version, for every package attached or loaded via namespace at run
start.mask_snapshot: the exact mask used (not a copy of a
reference — the value).parallel: NULL for a sequential run, or a
plain list mirroring the parallel_spec() used. Dropped
class to make the snapshot portable to serialization formats. The list
also carries effective_strategy: the strategy actually
applied by genproc(), which differs from
strategy when the user passed workers without
an explicit strategy and genproc()
auto-defaulted to "multisession". Both fields are recorded
so the snapshot is self-explanatory: strategy is what the
user asked for, effective_strategy is what was
applied.nonblocking: same pattern, for
nonblocking_spec().inputs: a fingerprint of every file the mask refers to,
or NULL if input tracking was disabled
(track_inputs = FALSE). See the next section.The snapshot lives inside the result. You can compare two results by
comparing their $reproducibility slots directly.
reproducibility$inputs is the layer that protects
against silent drift of upstream files. It is captured at t0 of the run,
alongside the rest of the snapshot.
str(result$reproducibility$inputs, max.level = 1)
#> List of 3
#> $ method: chr "stat"
#> $ files :'data.frame': 0 obs. of 3 variables:
#> $ refs :'data.frame': 0 obs. of 3 variables:method: currently "stat". Reserved for
future extensions (e.g. "md5" for content hashing).files: a deduplicated table of every file referenced by
the mask, with its size in bytes and last-modified time. One row per
unique path: a config file shared across 100 cases produces a single
row.refs: the (case_id, column,
path) triples saying who referenced what. Joins back to
files by path.By default, every character column of the mask whose non-NA values
are existing files (and contain a path separator) is treated as an input
column. The mask used in this vignette has its paths split
across src_dir and src_file, so the heuristic
finds nothing useful — src_dir is a directory (excluded),
src_file values are bare names (no separator). For a mask
that holds absolute paths directly:
mask_paths <- data.frame(
csv_in = file.path(src_dir, c("a.csv", "b.csv", "c.csv")),
stringsAsFactors = FALSE
)
do_one <- function(csv_in) nrow(read.csv(csv_in))
run0 <- genproc(do_one, mask_paths)
run0$reproducibility$inputs$files
#> path size
#> 1 C:/Users/rheri/AppData/Local/Temp/RtmpM78eGk/genproc-vignette-src/a.csv 221
#> 2 C:/Users/rheri/AppData/Local/Temp/RtmpM78eGk/genproc-vignette-src/b.csv 303
#> 3 C:/Users/rheri/AppData/Local/Temp/RtmpM78eGk/genproc-vignette-src/c.csv 161
#> mtime
#> 1 2026-05-07 21:14:19
#> 2 2026-05-07 21:14:19
#> 3 2026-05-07 21:14:19genproc(..., input_cols = c("col1", "col2")) bypasses
the heuristic and tracks exactly the named columns. Paths that don’t
exist at capture time are recorded with NA size/mtime and a
warning is emitted.genproc(..., skip_input_cols = c("col")) keeps the
heuristic but excludes a column (useful when a label column happens to
match an existing file in cwd).genproc(..., track_inputs = FALSE) disables tracking
entirely. result$reproducibility$inputs is
NULL.input_cols and skip_input_cols are mutually
exclusive. Mixing them raises an error — the two flags express
contradictory intentions and the call should clarify.
diff_inputs()# Rewrite a.csv with strictly more content (size changes)
write.csv(iris, file.path(src_dir, "a.csv"), row.names = FALSE)
run1 <- genproc(do_one, mask_paths)
diff_inputs(run0, run1)
#> genproc input diff (method: stat)
#> Changed: 1
#> Unchanged: 2
#> Removed: 0
#> Added: 0
#> Cases affected: 1
#>
#> Changed files:
#> C:/Users/rheri/AppData/Local/Temp/RtmpM78eGk/genproc-vignette-src/a.csv
#> size: 221 B -> 4.1 KB
#> mtime: 2026-05-07 21:14:19 -> 2026-05-07 21:14:19
#>
#> Cases affected (use rerun_affected() to re-run):
#> case_0001diff_inputs() returns an S3 object
(genproc_input_diff) with a print method for human reading
and named list components for programmatic access
($changed, $unchanged, $removed,
$added). Files are matched by canonical absolute path;
cross-machine comparison would need a separate matcher and is out of
scope for this version.
diff_inputs() refuses to compare snapshots produced with
different methods (forward-compatible with a future hash
mode).
A case that throws does not stop the run. Here we delete a source file between two runs:
file.remove(file.path(src_dir, "b.csv"))
#> [1] TRUE
result_broken <- genproc(convert, mask)
#> Warning in file(file, "rt"): cannot open file
#> 'C:\Users\rheri\AppData\Local\Temp\RtmpM78eGk/genproc-vignette-src/b.csv': No
#> such file or directory
result_broken$n_success
#> [1] 2
result_broken$n_error
#> [1] 1The failing row carries the error message and a filtered traceback.
The errors() helper subsets log on
success == FALSE:
bad <- errors(result_broken)
bad$error_message
#> [1] "cannot open the connection"
cat(bad$traceback[1], "\n")
#> 1. read.csv(file.path(src_dir, src_file))
#> 2. read.table(file = file, header = header, sep = sep, quote = quote, dec = dec, fill = fill, comment.char = comment.ch ...
#> 3. file(file, "rt")The traceback is captured via withCallingHandlers() at
the moment the error is thrown — it is the real R call stack, not a
string pulled out of conditionMessage(). The internal
tryCatch() and withCallingHandlers() frames
are filtered out so the trace reads like a normal R error.
Restore the file for subsequent sections:
Three helpers digest a genproc_result without touching
result$log directly.
errors(result) returns the subset of
failed cases as a data.frame:
summary(result) dispatches to
summary.genproc_result() and returns a printable digest
with the run status, success rate, duration statistics (mean / max /
slowest case_id) and the top recurring error messages:
summary(result_broken)
#> genproc result summary
#> Status : done
#> Cases : 3 (2 ok, 1 error)
#> Success : 67%
#> Total time : 0.02s
#> Per case : mean 0.000s, max 0.000s (slowest: case_0001)
#>
#> Top errors:
#> 1x cannot open the connectionThe summary object is a list — the printed view is rendered by
print.genproc_result_summary() for human reading; the
underlying fields (status, success_rate,
duration_stats, top_errors) are
programmatically accessible too.
Two more helpers close the loop by re-running a targeted
subset. Their case_ids are local to the subset
(re-numbered starting at case_0001); the link back to the
original run is via the matching rows of
r0$reproducibility$mask_snapshot.
rerun_failed(r0, f) re-runs only the
cases that failed in r0 — useful after fixing the function
or the input:
# Re-run with a hardened f that handles the missing file gracefully.
result_fixed <- rerun_failed(
result_broken,
f = function(src_dir, src_file, dst_dir, dst_file) {
in_path <- file.path(src_dir, src_file)
if (!file.exists(in_path)) return(NA)
df <- read.csv(in_path)
saveRDS(df, file.path(dst_dir, dst_file))
}
)rerun_affected(r0, diff, f) re-runs
only the cases referenced by a diff_inputs() result — the
natural action after detecting that some upstream input files have
drifted:
This closes the reproducibility loop: detect drift → re-run only the cases whose inputs changed, not the whole mask.
f and the mask from a
working exampleThe vignette so far assumed both f and mask
were already written by hand. In practice you often start from a
working script for one specific case and want to derive the
parameterized function and the mask template automatically. Three
exported helpers do this, in order.
from_example_to_function() — example expression to
functionTake an example expression that works on one case. Every external value (string literals, environment symbols that are not functions) becomes a parameter of the resulting function, with its current value stored as the default. Locally bound symbols (assignment targets, function formals) are protected.
# An example that works for ONE specific case
input_path <- file.path(src_dir, "a.csv")
output_path <- file.path(dst_dir, "a-from-example.rds")
example <- expression({
df <- read.csv(input_path)
saveRDS(df, output_path)
})
fn <- from_example_to_function(example)
formals(fn)
#> $param_1
#> [1] "C:\\Users\\rheri\\AppData\\Local\\Temp\\RtmpM78eGk/genproc-vignette-src/a.csv"
#>
#> $param_2
#> [1] "C:\\Users\\rheri\\AppData\\Local\\Temp\\RtmpM78eGk/genproc-vignette-dst/a-from-example.rds"from_function_to_mask() — function signature to mask
templateOnce you have the function, derive a one-row template
data.frame that mirrors its signature. You can then
rbind() extra rows to build a full mask.
rename_function_params() — give the parameters
domain namesThe auto-generated names (param_1, param_2,
…) are stable but not informative. Rename them in place —
formals and body are updated together, the function source
is not edited.
fn_named <- rename_function_params(
fn, c(param_1 = "input_path", param_2 = "output_path")
)
formals(fn_named)
#> $input_path
#> [1] "C:\\Users\\rheri\\AppData\\Local\\Temp\\RtmpM78eGk/genproc-vignette-src/a.csv"
#>
#> $output_path
#> [1] "C:\\Users\\rheri\\AppData\\Local\\Temp\\RtmpM78eGk/genproc-vignette-dst/a-from-example.rds"Putting it together: a renamed function plus a manually-built mask that follows the same column names.
mask_built <- data.frame(
input_path = file.path(src_dir, c("a.csv", "b.csv", "c.csv")),
output_path = file.path(dst_dir, c("a2.rds", "b2.rds", "c2.rds")),
stringsAsFactors = FALSE
)
genproc(fn_named, mask_built)$n_success
#> [1] 3The f_mapping argument shown in the next section is the
inline equivalent of step 3 — convenient when you don’t need a renamed
function for any other purpose than this single genproc()
call.
f_mappingIf the function you already have uses parameter names that don’t
match your mask’s column names, f_mapping renames them in
place without touching the source:
# `f` uses generic names; the mask uses domain names.
f <- function(input_dir, input_file, output_dir, output_file) {
df <- read.csv(file.path(input_dir, input_file))
saveRDS(df, file.path(output_dir, output_file))
}
genproc(
f = f,
mask = mask,
f_mapping = c(
"input_dir" = "src_dir",
"input_file" = "src_file",
"output_dir" = "dst_dir",
"output_file" = "dst_file"
)
)parallel_spec() records intent only; the actual workers
are started lazily by future when the plan is resolved.
Across many genproc() calls in the same session,
installing the plan once amortizes worker startup:
When workers is passed without strategy,
genproc() auto-defaults to multisession for
that single call and restores the previous plan on exit. This avoids the
silent trap of workers = N being ignored because the
current plan is sequential:
multisession costs roughly 1–3 seconds per worker at
first use. Below ~10 seconds of real work, a parallel run can be slower
than a sequential one. Don’t read a single short benchmark as evidence
of a bug: run at workload size.
parallel_spec(seed = TRUE) (the default) draws
independent L’Ecuyer-CMRG streams from a random master seed. Identical
master seed — identical per-case RNG state regardless of worker count or
chunking. To pin the master, pass an integer:
parallel_spec(seed = 42L).
job <- genproc(convert, mask, nonblocking = nonblocking_spec())
status(job) # "running", "done (not collected)", "done", or "error"
job <- await(job) # blocks until resolution
job$logBefore await():
log, n_success, n_error,
duration_total_secs are NULL.reproducibility is already populated — the snapshot is
captured synchronously, before the future is submitted, so it reflects
t0.status is "running".attr(x, "future").After await():
attr(x, "future") is removed.status is "done" — or "error"
if the wrapper future itself crashed (a rare case: per-case errors are
caught by the logging layer and don’t propagate to the wrapper).await() is idempotent. Calling it on an object that has
already been materialized (or was synchronous to begin with) returns it
unchanged. This makes it safe to pepper in user code without tracking
whether a particular result has already been collected.
"multisession"Unlike parallel_spec(), nonblocking_spec()
defaults to strategy = "multisession", not
NULL. Rationale: a function named “non-blocking” must not
silently block because the current future::plan() is
sequential (the default in a fresh R session). Power-users
who manage their plan can pass strategy = NULL explicitly
to defer.
When nonblocking_spec() installs a plan, the previous
plan is not restored on genproc() exit —
it is restored by await() once the future has been
collected. Restoring earlier would call future::plan()
while the wrapper future is still running, which shuts down the
multisession workers and surfaces a “Future was canceled” error at
collection time. The trade-off is that if you never call
await(), the installed plan stays active for the rest of
the session. Power-users who pass strategy = NULL and
manage the plan themselves are not affected.
The two optional layers are orthogonal:
job <- genproc(
convert, mask,
parallel = parallel_spec(workers = 6),
nonblocking = nonblocking_spec()
)
# get control back immediately
# ... do other work ...
job <- await(job)The non-blocking layer launches one outer future. Inside it, the
parallel layer dispatches cases via future.apply. Note:
with both set to multisession, future.apply
detects it is already inside a future and degrades the inner layer to
sequential by default, to avoid worker explosion. For true
nested parallelism, install future::plan(list(...))
explicitly and pass strategy = NULL on both specs.
mc.cores in the wrapper subprocessOn Windows and in some RStudio configurations, the wrapper subprocess
inherits getOption("mc.cores") set to 1 (the
legacy default for parallel::mclapply(), which is a no-op
on Windows). Without intervention, parallelly would refuse
to spawn the inner workers because workers / 1 exceeds the
localhost hard limit, and the composed call would fail with a confusing
"only 1 CPU cores available for this R process (per 'mc.cores')"
error.
genproc() handles this transparently. In the composed
case (parallel != NULL && nonblocking != NULL), it
makes two adjustments inside the wrapper subprocess:
R_PARALLELLY_AVAILABLECORES_METHODS = "system" so
that parallelly ignores mc.cores and uses the
true detected core count for its hard-limit check.options(mc.cores) from 1 to the system core
count, so that parallelly’s soft-limit warning (“only 1 CPU
cores available… 200% load”) does not fire with a misleading message
after the hard limit has been lifted.Both adjustments only apply if the user has not set their own values, and only inside the wrapper subprocess. The calling session is never modified.
genproc() integrates with the progressr
framework. Wrap the call in progressr::with_progress(...)
to opt in:
library(progressr)
with_progress(
result <- genproc(my_fn, mask, parallel = parallel_spec(workers = 4))
)Behind the scenes, genproc() emits one progression
condition per completed case. progressr lets the user pick
any handler: the default text bar in the console, an RStudio gadget,
audible beeps, custom log lines, or any handler the user wires up via
progressr::handlers().
Without with_progress(), the integration is a complete
no-op — zero overhead, zero visible change. progressr is a
soft dependency declared in Suggests; the integration is
skipped if the package is not installed.
In parallel mode, signals from worker subprocesses are propagated
back to the parent session via future.apply. Live
monitoring of non-blocking runs is not yet supported (signals would
arrive in a burst at await() time rather than live during
the run); this is on the roadmap.
Not yet in the package, but explicitly planned:
inputs layer uses a stat-based fingerprint (size + mtime),
which detects every legitimate file modification but can be fooled by an
adversary who preserves both. A method = "md5" (or
"xxhash64") opt-in is reserved in the API and will land
later.case_id: today case IDs
are index-based. A content-based variant will make replay stable even if
mask rows are reordered.replay(result, case_id)
to rerun one failed case in isolation.progressr integration covers the sequential and parallel
paths only; the non-blocking path needs a different design (collect
progress in the background, query on demand) and is planned.cancel() for non-blocking:
backend-dependent, deferred.The architecture is designed so that adding these layers does not
require changes to existing user code — new layers are composed as extra
arguments to genproc(), and extra fields on
genproc_result accumulate without removing existing
ones.