Skip to content

Job Processor State Machine

The JobProcessor class implements a finite state machine (FSM) that orchestrates job execution across the distributed inference pipeline. This document describes each state, the conditions for transitions, and how the FSM integrates with the broader Language Pipes architecture.

Overview

When a job arrives at a node (via JobReceiver), it is processed by a JobProcessor instance. The processor validates the job context, routes computation through local or remote model segments, and handles job completion or handoff.

States

VALIDATING

Purpose: Validate the job context before processing begins.

The FSM starts in this state for every job. It checks that all required resources are available:

  • The job object exists
  • The pipe is available and complete (all layer segments are ready)
  • For HEAD compute steps: the origin node matches and the end model is loaded

Transitions:

ConditionNext State
Job is missingDONE
Pipe is unavailable or incompleteDONE
Origin node mismatch (for HEAD step)DONE
End model unavailable (for HEAD step)DONE
Job is at HEAD step and prefill is completeHEAD
Job is at HEAD step with more prefill chunksEMBED
Job needs layer processingPROCESS_LAYERS

EMBED

Purpose: Embed the next token (or prefill chunk) to produce hidden states.

This state handles tokenization and embedding. For new jobs, it tokenizes the prompt and initializes chunking. For continuation, it embeds the most recently generated token.

Operations:

  1. Tokenize prompt (if not already done)
  2. Initialize chunking for prefill (if applicable)
  3. Advance to the next chunk (if doing chunked prefill)
  4. Compute embedding via EndModel.compute_embed()
  5. Send prefill progress update (if chunking is active)

Transitions:

ConditionNext State
Failed to send prefill updateDONE
No model available for next layerDONE
Next layer is virtual/remoteSEND
Next layer is localPROCESS_LAYERS

PROCESS_LAYERS

Purpose: Process the job through locally-hosted model layers.

This state runs the hidden state through one or more consecutive local layer segments. Each segment processes its layer range and updates the job’s current_layer.

Operations:

  1. Get the local model segment for the current layer
  2. Call LlmModel.process_job() to run through the segment’s layers
  3. Update the last update timestamp

Transitions:

ConditionNext State
No local model availableDONE
Next layer segment is remoteSEND
Next layer segment is localPROCESS_LAYERS
All layers complete (step becomes HEAD)(determined by next iteration)

Purpose: Compute the output head to generate the next token.

This state handles the final projection and sampling step. It only runs on the origin node (the node that initiated the job and has the end model loaded).

Operations:

  1. Log prefill completion (if transitioning from prefill to decode)
  2. Compute RMS normalization via EndModel.compute_norm()
  3. Compute output head projection via EndModel.compute_head()
  4. Record timing statistics
  5. If job is complete: set result and mark done
  6. If more tokens needed: send update to client and continue

Transitions:

ConditionNext State
Job completed (EOS token generated)DONE
Failed to send job updateDONE
More tokens to generate, next layer is localEMBED
More tokens to generate, next layer is remoteSEND
More tokens to generate, next layer needs processingPROCESS_LAYERS

SEND

Purpose: Hand off the job to another node.

This state serializes the job and sends it to the node hosting the next layer segment (or back to the origin for HEAD computation).

Operations:

  1. Convert job to NetworkJob payload
  2. Determine destination:
    • If HEAD step: send to origin node
    • Otherwise: send to node hosting the next layer
  3. Send via Pipe.send_job()

Transitions:

ConditionNext State
Handoff completeDONE

DONE

Purpose: Terminal state indicating this processing iteration is complete.

The job has either:

  • Completed successfully (all tokens generated)
  • Been handed off to another node
  • Encountered an error condition

State Transition Diagram

VALIDATING
├──(missing job/context/pipe)────────────────────────────► DONE
├──(HEAD step, prefill done)─────────────────────────────► HEAD
├──(HEAD step, more prefill chunks)──────────────────────► EMBED
└──(needs layer processing)──────────────────────────────► PROCESS_LAYERS
HEAD
├──(job complete or update failed)───────────────────────► DONE
├──(more tokens, local embedding)────────────────────────► EMBED
├──(more tokens, remote layer)───────────────────────────► SEND
└──(more tokens, local layer)────────────────────────────► PROCESS_LAYERS
EMBED
├──(update failed or missing model)──────────────────────► DONE
├──(next layer is remote)────────────────────────────────► SEND
└──(next layer is local)─────────────────────────────────► PROCESS_LAYERS
PROCESS_LAYERS
├──(missing local model)─────────────────────────────────► DONE
├──(next layer is remote)────────────────────────────────► SEND
└──(next layer is local)─────────────────────────────────► PROCESS_LAYERS
SEND
└──(handoff complete)────────────────────────────────────► DONE

Compute Steps

The job’s compute_step field determines what operation is needed next:

ComputeStepDescriptionProcessed By
TOKENIZEConvert messages to token IDsEnd model (origin node)
EMBEDEmbed tokens to hidden stateEnd model (origin node)
LAYERProcess through transformer layersLayer segments (any node)
NORMApply final RMS normalizationEnd model (origin node)
HEADProject to vocabulary and sampleEnd model (origin node)

Integration Points

Entry Point

Jobs enter the processor via JobReceiver, which:

  1. Deserializes the NetworkJob payload
  2. Validates the job hash
  3. Creates a JobContext
  4. Instantiates JobProcessor and calls run()

Exit Points

Jobs exit in three ways:

  1. Completion: Job reaches HEAD, generates EOS, result returned to client
  2. Handoff: Job sent to another node via Pipe.send_job()
  3. Error: Processing stops, job marked as failed