This vignette demonstrates how to use create_join_plan() to automate data aggregation and merging workflows. The function creates executable data processing plans based on table metadata and user selections.
First, we will create metadata for two tables: customers and transactions using table_info().
# Define customer metadata
<- table_info(
customers_meta table_name = "customers",
source_identifier = "customers.csv",
identifier_columns = "customer_id",
key_outcome_specs = list(
list(OutcomeName = "CustomerCount",
ValueExpression = 1,
AggregationMethods = list(
list(AggregatedName = "CountByRegion",
AggregationFunction = "sum",
GroupingVariables = "region")
)
)
)
)
# Define transaction metadata
<- table_info(
transactions_meta "transactions",
"t.csv",
"tx_id",
key_outcome_specs = list(
list(OutcomeName = "Revenue",
ValueExpression = quote(price * quantity),
AggregationMethods = list(
list(AggregatedName = "RevenueByCustomer",
AggregationFunction = "sum",
GroupingVariables = "customer_id")
)
))
)
# Combine metadata
<- create_metadata_registry()
master_metadata <- add_table(master_metadata, customers_meta)
master_metadata #> Added metadata for table: customers
<- add_table(master_metadata, transactions_meta)
master_metadata #> Added metadata for table: transactions
Now, we create the plan. We will omit the join_map
to
show that the function can generate it automatically.
<- list(
user_selections customers = "region",
transactions = "RevenueByCustomer"
)<- create_join_plan(
plan base_table = "customers",
selections = user_selections,
metadata_dt = master_metadata
)
print(plan)
#> step operation target details
#> <num> <char> <char> <char>
#> 1: 1 AGGREGATE agg_transactions Aggregate 'transactions'
#> 2: 2 MERGE merged_step_2 Merge 'customers' with 'agg_transactions'
#> 3: 3 SELECT final_data Select final columns
#> code
#> <char>
#> 1: agg_transactions <- transactions[, .(RevenueByCustomer = sum(price * quantity)), by = .(customer_id)]
#> 2: merged_step_2 <- merge(x = customers, y = agg_transactions, by = c('customer_id'), all.x = TRUE)
#> 3: final_data <- merged_step_2[, .SD, .SDcols = c('region','RevenueByCustomer')]
A key feature of the planner is its ability to validate user requests. What happens if we ask for an aggregation that cannot logically be joined to our base table?
Let’s ask for RevenueByProduct
(grouped by
product_id
) to be joined to the customers
table (keyed by customer_id
). This is not a valid join.
# Add product metadata for this example
<- table_info("products", "p.csv", "product_id", list(list(OutcomeName="x",ValueExpression=1,AggregationMethods=list(list(AggregatedName="y",AggregationFunction="z",GroupingVariables="category")))))
products_meta <- table_info("transactions", "t.csv", "trans_id", list(
transactions_meta_v2 list(OutcomeName="Revenue", ValueExpression=quote(price*qty), AggregationMethods=list(
# This aggregation is by product_id, not customer_id
list(AggregatedName="RevenueByProduct", AggregationFunction="sum", GroupingVariables="product_id")
))
))<- rbindlist(list(customers_meta, products_meta, transactions_meta_v2))
invalid_metadata
# The invalid request
<- list(
invalid_selections customers = "customer_id",
transactions = "RevenueByProduct"
)
Instead of producing a faulty plan or a cryptic error,
create_join_plan
stops with a clear, informative
message.
create_join_plan(
base_table = "customers",
selections = invalid_selections,
metadata_dt = invalid_metadata
)#> Warning in create_join_plan(base_table = "customers", selections =
#> invalid_selections, : No direct path found from 'transactions' to 'customers'.
#> Skipping this table.'
#> step operation target details
#> <num> <char> <char> <char>
#> 1: 1 SELECT final_data Select final columns
#> code
#> <char>
#> 1: final_data <- customers[, .SD, .SDcols = c('customer_id')]
The reason this is invalid is that the join key of the selected aggregation does not match the join key of the base table.
customers
, whose
primary join key is customer_id
.transactions
table.product_id
.create_join_plan()
, correctly
sees that there is no direct path to join a table keyed by
product_id
to a table keyed on
customer_id
.This strict validation ensures that only logical and correct data manipulation plans are generated, preventing common data analysis errors.