Job Processor State Machine
The JobProcessor class implements a finite state machine (FSM) that orchestrates job execution across the distributed inference pipeline. This document describes each state, the conditions for transitions, and how the FSM integrates with the broader Language Pipes architecture.
Overview
When a job arrives at a node (via JobReceiver), it is processed by a JobProcessor instance. The processor validates the job context, routes computation through local or remote model segments, and handles job completion or handoff.
States
VALIDATING
Purpose: Validate the job context before processing begins.
The FSM starts in this state for every job. It checks that all required resources are available:
- The job object exists
- The pipe is available and complete (all layer segments are ready)
- For
HEADcompute steps: the origin node matches and the end model is loaded
Transitions:
| Condition | Next State |
|---|---|
| Job is missing | DONE |
| Pipe is unavailable or incomplete | DONE |
Origin node mismatch (for HEAD step) | DONE |
End model unavailable (for HEAD step) | DONE |
Job is at HEAD step and prefill is complete | HEAD |
Job is at HEAD step with more prefill chunks | EMBED |
| Job needs layer processing | PROCESS_LAYERS |
EMBED
Purpose: Embed the next token (or prefill chunk) to produce hidden states.
This state handles tokenization and embedding. For new jobs, it tokenizes the prompt and initializes chunking. For continuation, it embeds the most recently generated token.
Operations:
- Tokenize prompt (if not already done)
- Initialize chunking for prefill (if applicable)
- Advance to the next chunk (if doing chunked prefill)
- Compute embedding via
EndModel.compute_embed() - Send prefill progress update (if chunking is active)
Transitions:
| Condition | Next State |
|---|---|
| Failed to send prefill update | DONE |
| No model available for next layer | DONE |
| Next layer is virtual/remote | SEND |
| Next layer is local | PROCESS_LAYERS |
PROCESS_LAYERS
Purpose: Process the job through locally-hosted model layers.
This state runs the hidden state through one or more consecutive local layer segments. Each segment processes its layer range and updates the job’s current_layer.
Operations:
- Get the local model segment for the current layer
- Call
LlmModel.process_job()to run through the segment’s layers - Update the last update timestamp
Transitions:
| Condition | Next State |
|---|---|
| No local model available | DONE |
| Next layer segment is remote | SEND |
| Next layer segment is local | PROCESS_LAYERS |
All layers complete (step becomes HEAD) | (determined by next iteration) |
HEAD
Purpose: Compute the output head to generate the next token.
This state handles the final projection and sampling step. It only runs on the origin node (the node that initiated the job and has the end model loaded).
Operations:
- Log prefill completion (if transitioning from prefill to decode)
- Compute RMS normalization via
EndModel.compute_norm() - Compute output head projection via
EndModel.compute_head() - Record timing statistics
- If job is complete: set result and mark done
- If more tokens needed: send update to client and continue
Transitions:
| Condition | Next State |
|---|---|
| Job completed (EOS token generated) | DONE |
| Failed to send job update | DONE |
| More tokens to generate, next layer is local | EMBED |
| More tokens to generate, next layer is remote | SEND |
| More tokens to generate, next layer needs processing | PROCESS_LAYERS |
SEND
Purpose: Hand off the job to another node.
This state serializes the job and sends it to the node hosting the next layer segment (or back to the origin for HEAD computation).
Operations:
- Convert job to
NetworkJobpayload - Determine destination:
- If
HEADstep: send to origin node - Otherwise: send to node hosting the next layer
- If
- Send via
Pipe.send_job()
Transitions:
| Condition | Next State |
|---|---|
| Handoff complete | DONE |
DONE
Purpose: Terminal state indicating this processing iteration is complete.
The job has either:
- Completed successfully (all tokens generated)
- Been handed off to another node
- Encountered an error condition
State Transition Diagram
VALIDATING │ ├──(missing job/context/pipe)────────────────────────────► DONE │ ├──(HEAD step, prefill done)─────────────────────────────► HEAD │ ├──(HEAD step, more prefill chunks)──────────────────────► EMBED │ └──(needs layer processing)──────────────────────────────► PROCESS_LAYERS
HEAD │ ├──(job complete or update failed)───────────────────────► DONE │ ├──(more tokens, local embedding)────────────────────────► EMBED │ ├──(more tokens, remote layer)───────────────────────────► SEND │ └──(more tokens, local layer)────────────────────────────► PROCESS_LAYERS
EMBED │ ├──(update failed or missing model)──────────────────────► DONE │ ├──(next layer is remote)────────────────────────────────► SEND │ └──(next layer is local)─────────────────────────────────► PROCESS_LAYERS
PROCESS_LAYERS │ ├──(missing local model)─────────────────────────────────► DONE │ ├──(next layer is remote)────────────────────────────────► SEND │ └──(next layer is local)─────────────────────────────────► PROCESS_LAYERS
SEND │ └──(handoff complete)────────────────────────────────────► DONECompute Steps
The job’s compute_step field determines what operation is needed next:
| ComputeStep | Description | Processed By |
|---|---|---|
TOKENIZE | Convert messages to token IDs | End model (origin node) |
EMBED | Embed tokens to hidden state | End model (origin node) |
LAYER | Process through transformer layers | Layer segments (any node) |
NORM | Apply final RMS normalization | End model (origin node) |
HEAD | Project to vocabulary and sample | End model (origin node) |
Integration Points
Entry Point
Jobs enter the processor via JobReceiver, which:
- Deserializes the
NetworkJobpayload - Validates the job hash
- Creates a
JobContext - Instantiates
JobProcessorand callsrun()
Exit Points
Jobs exit in three ways:
- Completion: Job reaches
HEAD, generates EOS, result returned to client - Handoff: Job sent to another node via
Pipe.send_job() - Error: Processing stops, job marked as failed