Synchronisation Layer
How Max syncs data from external sources into its local store.
Overview
Section titled “Overview”The sync layer has three parts:
- Resolvers & Loaders define how to fetch data for each entity type
- Seeders & SyncPlans define what to sync and in what order
- The Executor turns a plan into a task graph and drains it
Loaders
Section titled “Loaders”A Loader fetches data from an external API and returns EntityInput values. Loaders receive an env parameter - a LoaderEnv that provides access to platform capabilities. Today this includes the connector context (env.ctx) and the operation executor (env.ops) - see Operations for details.
There are three variants:
| Variant | Signature | Use case |
|---|---|---|
entity | (ref) => EntityInput | Fetch one entity at a time |
entityBatched | (refs[]) => Batch<EntityInput> | Fetch many entities in one API call |
collection | (parentRef, page) => Page<EntityInput> | Paginated child entities |
Entity Loader
Section titled “Entity Loader”Fetches fields for a single entity.
const TeamBasicLoader = Loader.entity({ name: "acme:team:basic", context: AcmeAppContext, entity: AcmeTeam,
async load(ref, env) { const team = await env.ctx.api.teams.get(ref.id); return EntityInput.create(ref, { name: team.name, description: team.description, owner: AcmeUser.ref(team.ownerId), }); },});Batched Entity Loader
Section titled “Batched Entity Loader”Fetches fields for many entities in a single API call. Preferred when the API supports batch retrieval.
const BasicUserLoader = Loader.entityBatched({ name: "acme:user:basic", context: AcmeAppContext, entity: AcmeUser,
async load(refs, env) { const users = await env.ctx.api.users.getBatch(refs.map(r => r.id)); return Batch.buildFrom( users.map(u => EntityInput.create(AcmeUser.ref(u.id), { name: u.name, email: u.email, })) ).withKey(input => input.ref); },});Collection Loader
Section titled “Collection Loader”Fetches a paginated list of child entities belonging to a parent.
const TeamMembersLoader = Loader.collection({ name: "acme:team:members", context: AcmeAppContext, entity: AcmeTeam, target: AcmeUser,
async load(ref, page, env) { const result = await env.ctx.api.teams.listMembers(ref.id, { cursor: page.cursor, limit: page.limit, }); const items = result.members.map(m => EntityInput.create(AcmeUser.ref(m.userId), {}) ); return Page.from(items, result.hasMore, result.nextCursor); },});Resolvers
Section titled “Resolvers”A Resolver maps an entity’s fields to the loaders that populate them. Each field points to exactly one loader.
const AcmeTeamResolver = Resolver.for(AcmeTeam, { name: TeamBasicLoader.field("name"), description: TeamBasicLoader.field("description"), owner: TeamBasicLoader.field("owner"), members: TeamMembersLoader.field(),});Multiple fields can share a loader. When the executor needs to load name and description, it sees both map to TeamBasicLoader and makes a single call.
Seeder & SyncPlan
Section titled “Seeder & SyncPlan”A Seeder bootstraps a sync from cold start. It stores an initial entity (the root) and returns a SyncPlan describing what to sync and in what order.
const AcmeSeeder = Seeder.create({ context: AcmeAppContext,
async seed(ctx, engine) { // Store the root entry point const rootRef = AcmeRoot.ref("root"); await engine.store(EntityInput.create(rootRef, {}));
return SyncPlan.create([ Step.forRoot(rootRef).loadCollection("teams"), Step.forAll(AcmeTeam).loadFields("name", "description", "owner"), Step.forAll(AcmeTeam).loadCollection("members"), Step.forAll(AcmeUser).loadFields("name", "email"), ]); },});A SyncPlan is a sequence of steps. Each step has a target (which entities) and an operation (what to load).
Targets:
| Target | Meaning |
|---|---|
forRoot(ref) | A single known root entity |
forOne(ref) | A single known entity |
forAll(EntityDef) | All entities of this type in the store |
Operations:
| Operation | Meaning |
|---|---|
loadFields("a", "b") | Load the named fields via their resolvers |
loadCollection("field") | Load a collection field (paginated) |
Steps run sequentially by default. Each step waits for the previous step (and all its children) to finish before starting. This matters because later steps often depend on entities discovered by earlier ones.
Concurrent Steps
Section titled “Concurrent Steps”Steps that don’t depend on each other can run in parallel:
SyncPlan.create([ Step.forRoot(rootRef).loadCollection("teams"), Step.concurrent([ Step.forAll(AcmeTeam).loadFields("name"), Step.forAll(AcmeProject).loadFields("status"), ]),]);Execution
Section titled “Execution”The SyncExecutor turns a SyncPlan into a task graph and processes it.
From Plan to Task Graph
Section titled “From Plan to Task Graph”The PlanExpander converts each step into one or more tasks, wiring up dependencies:
Tasks start in new (blocked) or pending (ready). When a blocking task completes, its dependents move from new to pending.
Task States
Section titled “Task States”Dynamic Children
Section titled “Dynamic Children”Tasks spawn child tasks at runtime. Both loadCollection and loadFields steps produce children:
forAll.loadCollectioncreates oneload-collectionchild per entityforAll.loadFieldspages through refs and creates batchedload-fieldschildren (default: 25 refs per batch)
The parent moves to awaiting_children and completes when all children finish. Because children are independent tasks, the worker pool executes them concurrently.
Collection loaders also paginate. If a page has more results, the task spawns a continuation child with the next cursor:
The Worker Pool
Section titled “The Worker Pool”The executor runs a pool of 64 concurrent workers. Each worker independently claims tasks and executes them. A FlowController at the executor level gates how many tasks actually run in parallel (default: 50 concurrent tasks via a semaphore).
Workers coordinate through a Signal primitive rather than polling. When no tasks are available, a worker calls Signal.wait() and blocks at zero CPU cost. When a task completes, spawns children, or fails, it calls Signal.notifyAll() to wake all idle workers immediately.
When a task completes:
- Tasks blocked by it (
blockedBy) move fromnewtopending - If all siblings of a parent are complete, the parent completes too (walking up the parent chain iteratively)
Two layers of flow control
Section titled “Two layers of flow control”Task execution is gated at two levels:
| Layer | What it limits | Default |
|---|---|---|
| Executor-level FlowController | How many tasks run concurrently | 50 (semaphore) |
| Operation-level Limit | How many API calls for a given limit run concurrently | Per-connector (e.g., 50 for acme:api) |
The executor-level controller prevents overwhelming the system with too many concurrent tasks. The operation-level controller (enforced via middleware) prevents overwhelming external APIs. Both are FlowController implementations - the executor’s is configured at wiring time, the operation’s is declared per-connector via Limit.concurrent().
Error Handling
Section titled “Error Handling”When a task fails, the drain loop marks it as failed and moves on. The loop exits when no tasks are pending or running - tasks left in new (blocked by a failed task) or awaiting_children (with a failed child) remain in those states.
The SyncResult reports both tasksCompleted and tasksFailed, so the caller knows the sync wasn’t clean.
The stranded tasks preserve honest state: they weren’t cancelled or failed - they were never attempted. A future resume operation could retry the failed task, which would naturally unblock the rest.
SyncHandle
Section titled “SyncHandle”execute() returns a SyncHandle immediately. The sync runs in the background.
const handle = executor.execute(plan);
// Monitorconst status = await handle.status(); // "running" | "paused" | "completed" | ...
// Controlawait handle.pause();await handle.resume();await handle.cancel();
// Wait for resultconst result = await handle.completion();// { status: "completed", tasksCompleted: 12, tasksFailed: 0, duration: 1540 }End-to-End Example
Section titled “End-to-End Example”Putting it all together for the Acme connector:
Limitations & Future Work
Section titled “Limitations & Future Work”Not yet implemented
Section titled “Not yet implemented”-
Error recovery / resume - When a sync has failures, stranded tasks remain in honest states. A resume mechanism could retry failed tasks and naturally unblock the rest. The state model supports this; the trigger mechanism doesn’t exist yet.
-
Cross-step deduplication - If step 3 discovers users u1, u2, u3 and step 4 also needs them, they’re loaded independently. Deduplication across steps would coalesce these into a single load.
Key Concepts Summary
Section titled “Key Concepts Summary”| Concept | Role |
|---|---|
| EntityDef | Schema for an entity type (fields + types) |
| Ref | Typed pointer to an entity instance |
| Loader | Fetches data from an external API |
| Resolver | Maps entity fields to loaders |
| Seeder | Bootstraps a sync from cold start |
| SyncPlan | Ordered list of steps to execute |
| Step | Target (which entities) + operation (what to load) |
| Task | Serialisable unit of work in the execution layer |
| SyncExecutor | Orchestrates the worker pool and task lifecycle |
| TaskRunner | Executes a single task (calls loaders, stores results) |
| FlowController | Gates concurrency (at executor and operation levels) |
| Signal | Wait/notify primitive for worker coordination |
| TaskStore | Persists task state (supports restart/resume) |
| SyncHandle | Control and monitor a running sync |