GitHub - gaffo/jorb: Job Processing Library · GitHub
Skip to content

gaffo/jorb

Folders and files

Repository files navigation

JORB - Concentrated Computing

Jorb is a Concentrated (as opposed to Distributed) workflow system. Single computers are BIG and FAST these days. You can do a lot with them, but there aren't many nice workflow composition systems that don't require all sorts of distributed craziness like a message queue and multiple nodes and processes.

Jorb is that system for golang.

I use it when I have complex batch jobs that need:

  • To not run in a massively complex distributed computing system
  • Easy to debug and expand while in flight (who doesn't code in the debugger)
  • Rate limiting per step (including adaptive AIMD rate limiting)
  • Concurrency controls per step
  • Fancy Status bars (I found that https://github.com/vbauerster/mpb makes a good status listener, not provided currently)
  • Error recovery
  • State checkpointing
  • Very easy to stop and restart at whim

For instance I'm currently using it to power large sets of code changes, where I need to check out packages, make changes, commit, trial build on a fleet, monitor status, fork state depending on build success or failure, cut cr, send cr via email/slack/etc, check status. Many of these steps have rate limits with services or sane rate limits with the OS.

Quick Start

The unit tests in processor_test.go provide a pretty good example to get started, for instance https://github.com/gaffo/jorb/blob/main/processor_test.go#L111

See examples/aimd/main.go for a complete example using adaptive rate limiting.

Installation

To install Jorb, use:

go get github.com/gaffo/jorb

Features

Adaptive Rate Limiting (AIMD)

Jorb includes built-in adaptive rate limiting using AIMD (Additive Increase, Multiplicative Decrease) backoff:

states := []jorb.State[AC, OC, JC]{
    {
        TriggerState: "api-call",
        Exec:         makeAPICall,
        Concurrency:  10,
        RateLimit:    jorb.NewAIMDRateLimiter(100, 10, 200), // start, min, max req/s
    },
}

Return &jorb.RateLimitError{Err: err} from your exec function when rate limits are hit, and the processor will automatically back off and recover:

func makeAPICall(ctx context.Context, ac AC, oc OC, jc JC) (JC, string, []jorb.KickRequest[JC], error) {
    _, err := api.Call()
    if isRateLimited(err) {
        return jc, "api-call", nil, &jorb.RateLimitError{Err: err}
    }
    return jc, "done", nil, nil
}

Use an AIMDRateLimiter as RateLimit so backoff applies. A plain *rate.Limiter still works for fixed limits, but returning RateLimitError without an BackoffRateLimiter only records the error (see slog.Debug in the processor).

Tune AIMD with NewAIMDRateLimiterWithConfig and AIMDRateLimiterConfig. Additive increase is at most one +1 per IncreaseInterval (default 1s) since the last applied increase. Pass a Clock to control time in tests. Concurrent multiplicative backoffs are coalesced by default; set DisableBackoffMerge so every Backoff() applies a full ×0.5.

The rate limiter automatically:

  • Backs off on rate-limit errors (×0.5 per effective backoff step, clamped to min)
  • Increases on success (+1 per increase interval while under max)
  • Stays within configured min/max bounds (initial/min/max are normalized, including min > max and non-positive inputs)

Per-State Timeout

Set State.Timeout to bound how long a single Exec invocation may run. When the deadline expires the worker routes the job to a configured FailState and records a "timed out after <duration>" entry in the job's StateErrors. The timeout applies per attempt, so a state that re-enqueues itself for retry receives a fresh window each time:

states := []jorb.State[AC, OC, JC]{
    {
        TriggerState: "fetch",
        Exec:         fetch,
        Concurrency:  10,
        Timeout: &jorb.Timeout{
            Duration:  30 * time.Second,
            FailState: "fetch-failed",
        },
    },
    {TriggerState: "fetch-failed", Terminal: true},
    {TriggerState: "done", Terminal: true},
}

A nil Timeout (the default) disables the deadline. NewProcessor rejects misconfigured timeouts at construction: zero/negative Duration, missing FailState, a FailState that is not in the registered state set, or a Timeout declared on a terminal state.

Outer cancellation of the context passed to Processor.Exec (for example, a Ctrl-C of the host process) is not reclassified as a timeout. The worker only routes to FailState when its own deadline elapses and the parent context is still healthy; otherwise the original error propagates so callers can distinguish operator-driven shutdown from a stuck job.

Concepts

When making a program you basically need the following: AC, OC, JC, Job, Run, States, StatusListener, persistence (JsonSerializer or NilSerializer), and a Processor

AC: Application Context

This is a struct where you can pass job specific application context to the jobs as they execute state transitions. Generaly I put implementations or funcs that let me call apis, like aws sdk clients.

This is usually epmemeral per go process run, don't keep state here, just glue your app into this

This is passed into each state exec function invocation

OC: Overalll Context

This is the overall state for your job, store state (generally immutable) here for values that all of your jobs in a Run will need. I'll store things like a database name, or file name or work directory that is specific to that run.

This is passed into each state exec function invocation

JC: Job Context

This is a per-job state that is specific to this workflow. Go wide table format here, using a union of all of the fields that all of this workflow's states will need.

This is passed into each state exec function invocation. You mutate it locally and return the updated JC.

NOTE: These fields need to be JSON serialzable.

Job

A job has a State (string) and a JC which contains your workflow specific state for each job. Jobs also track their state transitions, parent jobs, and errors per state.

Run

A run is a serializable group of jobs. Generally you create a run and add jobs to it then fire it at a processor. Or you load a previous job with a serialzier, fire it at a processor. It's meant to be super restartable.

You can also load up a run and spit outreports once it's been fully processed (or reallly at any time). It contains ALL of the state for a job other than the AC.

States

A State is a description of a possible state that a job can be in, a state has:

  • A TriggerState which is a string matching the state of the jobs you want this state to process
  • An optional ExecFunction which does the acutal processing (more in a sec)
  • Terminal: if the state is terminal, then it won't process, and a run will be considered complete when all jobs are in terminal states. Fun note, you can just swap in code on if a state is terminal to patch up workflows or to stop certain actions (I turn terminal off in off hours so I don't send actual CRs, just all the pre-validation). flag.Bool works great for this.
  • Concurrency: the number of concurrent procesors for this state, this is nice if the steps take a while esp on network calls
  • RateLimit: a rate.Limit that is shared by all processors for this state, great if you are hitting a rate limited api
  • Timeout: an optional *Timeout that bounds how long a single Exec invocation may run; on expiry the job routes to the configured FailState (see Per-State Timeout)

Typically you want to be pretty granular with your steps. For instance in a recent workflow I have seperate states for:

  • File modification
  • Git Add
  • Git Commit
  • Git Push
  • Git Fetch
  • Git rev-parse This really just helps with debugging and "redriving" where you need to quickly send all of a state back to another state to patch them up, esp when developing flows and handling errors.

ExecFunc

This is the meat of the work. The general contract is that it is called with:

  • AC - execution (app) context
  • OC - overall context
  • JC - app specific context

And returns:

  • JC - updated with the state mutations that state did
  • NewState - the next state you want to go to with this job. It's totaly fine to go to the same state or previous states. On errors I usually come back to the same state which is basically a retry, or go to a seperate terminal state for specific errors (this is good because it allows you to easily redrive by editing the state file with find and replace).
  • optional []KickRequests - these are requests to span new jobs. This is how you fork a workflow. For instance if you get data from a table and you want to fire a lot of S3 fetches, use this. it's fine if you take the job that kicked everythign else and send it to a termainal state and do all the other work, or just re-use it as the first of many. Kicks will get a job ID that is ${parent_id}->${new_seq}.
  • error - This is logged on the job by state and will eventually have logic for retries and termination if there are too many

StatusListener

You can use a nil one but I hook this up to a hash of progress bars per state to show my status.

You have to have one cause I'm too lazy to deal with nil.

Persistence (JsonSerializer)

Use NewJsonSerializer with a checkpoint file path and JsonSerializerConfig{}, pass the returned *JsonSerializer and *Run into NewProcessor, and call Close() on shutdown so the final checkpoint is written and incremental JSONL files are merged away. That gives incremental JSONL appends on each completion plus periodic full checkpoints.

If you do not need disk persistence, use NilSerializer as the persistence argument (no-op append/checkpoint).

Processor

This does all the work: construct it with an app context and states, then Exec a run. It blocks until processing finishes, invoking exec functions, JSON persistence, and the status listener as needed.

Other Notes

This is super alpha software. I am point releasing it every breaking change at the v0.0.x level.

There are several places where I just log.Fatal if the app is in an invalid state especially if a unknown state string is found. The nice thing is the app tries to checkpoint the run file state every job completion, so you usually lose little information.

State saving: about that, work is definitely queueing up before it gets serialized. I need to optimize this better with batching so I'm not doing as many expensive saves.

Throughput: speaking of throughput, there's definitely room for improvement here. Too much work is getting queued up for the processors. Too much work is stacking up on the return queue.

Metrics: I'd really like to get metrics around how long states are taking, how many are executing on average, and where the bottle necks are. I think that many of the states can be dynamically adjusted on concurrency for optimal performance (when you do memory or disk or cpu heavy jobs).

It uses slog, but doesn't setup a default logger, you can fix this by creating a file logger. It's VERY spammy if you don't.

Testing and refactoring is needed. It's getting better but testing a system like this is complex, and I need to pull some of the major functions into their own functions so I can test a lot of the edge cases without firing up a big job.

About

Job Processing Library

Resources

License

Stars

Watchers

Forks

Packages

Contributors