Active Caching with Subscriptions
The passive caching pattern — fetching from the source on demand, expiring on a timer — works well for data that changes infrequently and where brief staleness is acceptable. But for data that changes often, a TTL is a blunt instrument: too short and you're making unnecessary upstream calls; too long and you're serving stale data.
Active caching solves this by inverting the flow. Instead of polling the source, your cache subscribes to it. When the source changes, it pushes the update directly into the Harper cache — instantly, without waiting for a TTL to expire. Records stay fresh until they actually change, and there's no background polling overhead.
In this guide you will implement an active cache for a live sports scoreboard feed. The source streams score updates as server-sent events; the Harper cache receives each update immediately and serves the current score to any number of downstream clients.
What You Will Learn
- How passive and active caching differ in architecture and trade-offs
- How to implement a
subscribemethod on a source Resource - How to yield events from an async generator
- How to push events from a callback-based source using the subscription stream
- When to use
putvs.invalidateevents - How to control which threads run the subscription
Prerequisites
- Completed Caching with Harper
- Familiarity with async generators in JavaScript
Passive vs. Active Caching
In the passive pattern, Harper drives the flow:
Client → Harper (cache miss or stale) → Source → Harper stores result → Client
The cache only knows data is stale when a client asks for it and the TTL has elapsed. Between TTL resets, the source can change any number of times and the cache has no idea.
In the active pattern, the source drives the flow:
Source changes → Source pushes event → Harper updates cache proactively
Client → Harper (always fresh) → Client
Harper receives every change the moment it happens. No TTL is needed — records stay cached indefinitely and are only replaced when the source says they changed.
| Aspect | Passive | Active |
|---|---|---|
| TTL required | Yes | No (optional as a fallback) |
| Staleness window | Up to TTL duration | Near-zero |
| Upstream calls | One per record per TTL interval | Only on actual changes |
| Source requirement | Simple get endpoint | Streaming or push-capable API |
| Complexity | Low | Moderate |
Setting Up the Application
Clone the example repository and open it in your editor.
git clone https://github.com/HarperFast/active-caching-example.git harper-active-caching
The repository has the following structure:
harper-active-caching/
├── config.yaml
├── schema.graphql
└── resources.js
Start Harper in dev mode from inside the directory:
harper dev .
Defining the Cache Table
Open schema.graphql. The scoreboard cache table has no expiration — it stays valid until the source pushes an update:
type GameScore @table @export {
id: ID @primaryKey # game ID, e.g. "game-001"
homeTeam: String @indexed
awayTeam: String @indexed
homeScore: Int
awayScore: Int
status: String @indexed # "live", "final", "upcoming"
lastUpdated: Long
}
Without expiration, records never go stale passively. The only way they update is when the source pushes a put or invalidate event — or when Harper calls get() on a cache miss for a record that hasn't been loaded yet.
Implementing the Active Source
Open resources.js. The ScoreboardFeed class connects to an imaginary streaming API and yields score updates as Harper cache events.
// resources.js
const SCORES_API_BASE = process.env.SCORES_API_BASE ?? 'https://scores.example.com';
const scoreboardFeed = {
async get(id) {
// Called on cache miss — fetch the initial state for a specific game
const response = await fetch(`${SCORES_API_BASE}/games/${id}`);
if (!response.ok) {
const error = new Error('Game not found');
error.statusCode = 404;
throw error;
}
return response.json();
},
async *subscribe() {
// Called once to stream all ongoing updates into the cache
const response = await fetch(`${SCORES_API_BASE}/stream`, {
headers: { Accept: 'text/event-stream' },
});
for await (const chunk of response.body) {
const lines = chunk.toString().split('\n');
for (const line of lines) {
if (!line.startsWith('data: ')) continue;
const event = JSON.parse(line.slice(6));
yield {
type: 'put',
id: event.gameId,
value: event.score,
timestamp: event.ts,
};
}
}
},
};
tables.GameScore.sourcedFrom(scoreboardFeed);
get() and subscribe() have distinct roles:
get()— handles cache misses. If a client asks forgame-001before the subscription has delivered it, Harper callsget()to fetch the initial state.subscribe()— streams all future updates. Harper calls this once at startup and propagates every yielded event into the cache automatically.
How Harper calls subscribe
Harper calls subscribe() once per process immediately after sourcedFrom is registered. The method should return (or be) an async iterable that yields events indefinitely. Harper does not call subscribe() per record — a single subscription covers the entire table.
Event Types
The type field on each yielded event controls how Harper applies the update:
// Replace the entire cached record with the new value
yield { type: 'put', id: 'game-001', value: { homeScore: 3, awayScore: 1, ... } };
// Tell Harper the record changed without sending the new value.
// Harper will evict the record; the next client request triggers a get() call.
yield { type: 'invalidate', id: 'game-001' };
// Remove the record from the cache
yield { type: 'delete', id: 'game-001' };
Use put when the event stream includes full record values — this is the most efficient path because Harper stores the value immediately without a follow-up get() call. Use invalidate when the stream only signals that something changed, and you want Harper to lazy-load the new value on demand.
Using a Callback-Based Source
Not all sources use async iterables. If your upstream uses a callback or event-emitter API, use the default subscription stream instead of an async generator:
const scoreboardFeed = {
subscribe() {
const subscription = super.subscribe(); // default stream
const socket = new WebSocket(`wss://scores.example.com/ws`);
socket.on('message', (raw) => {
const event = JSON.parse(raw);
subscription.send({
type: 'put',
id: event.gameId,
value: event.score,
timestamp: event.ts,
});
});
socket.on('error', (err) => {
subscription.error(err); // surfaces to Harper's error handling
});
return subscription;
},
};
Configuring the Application
Open config.yaml:
graphqlSchema:
files: 'schema.graphql'
rest: true
jsResource:
files: 'resources.js'
graphqlSchemaloadsschema.graphqland creates theGameScoretable.restexposesGameScoreas an HTTP endpoint.jsResourceloadsresources.js, registersScoreboardFeed, and starts the subscription on startup.
Observing Active Updates
With Harper running, open two terminals. In the first, poll a game score every second:
- curl
- fetch
watch -n1 'curl -s http://localhost:9926/GameScore/game-001 | jq .'
setInterval(async () => {
const data = await fetch('http://localhost:9926/GameScore/game-001').then((r) => r.json());
console.log(data.homeScore, data.awayScore, data.status);
}, 1000);
In the second terminal, simulate a score update being pushed by the source (bypassing the stream for testing):
- curl
- fetch
curl -X PUT 'http://localhost:9926/GameScore/game-001' \
-H 'Content-Type: application/json' \
-d '{"homeTeam":"Rangers","awayTeam":"Hawks","homeScore":3,"awayScore":2,"status":"live","lastUpdated":1712500000000}'
await fetch('http://localhost:9926/GameScore/game-001', {
method: 'PUT',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
homeTeam: 'Rangers',
awayTeam: 'Hawks',
homeScore: 3,
awayScore: 2,
status: 'live',
lastUpdated: 1712500000000,
}),
});
The first terminal will reflect the new score immediately — no TTL expiry needed, no cache miss, no upstream call. The cache was updated in-place by the put event.
Controlling Subscription Threads
Harper runs multiple worker threads. By default, subscribe() runs on exactly one thread to prevent duplicate events and race conditions — if every thread opened its own connection to the source, every event would be processed multiple times.
In rare cases you may want subscriptions on multiple threads — for example, if your source shards data and each thread should subscribe to a different shard. Use subscribeOnThisThread to control this:
const scoreboardFeed = {
subscribeOnThisThread(threadIndex) {
return threadIndex === 0; // default: only thread 0
}
async *subscribe() { ... }
}
Adding a TTL Fallback
Even with an active subscription, network interruptions can cause the connection to drop. You can add expiration to the table as a safety net — if the subscription fails and a record becomes stale, Harper will fall back to calling get():
type GameScore @table(expiration: 60) @export {
id: ID @primaryKey
...
}
With this in place, records are guaranteed to be at most 60 seconds stale even if the subscription connection drops.
Putting It All Together
Here is the complete resources.js:
// resources.js
const SCORES_API_BASE = process.env.SCORES_API_BASE ?? 'https://scores.example.com';
const scoreboardFeed = {
async get(id) {
const response = await fetch(`${SCORES_API_BASE}/games/${id}`);
if (!response.ok) {
const error = new Error('Game not found');
error.statusCode = 404;
throw error;
}
return response.json();
},
async *subscribe() {
const response = await fetch(`${SCORES_API_BASE}/stream`, {
headers: { Accept: 'text/event-stream' },
});
for await (const chunk of response.body) {
const lines = chunk.toString().split('\n');
for (const line of lines) {
if (!line.startsWith('data: ')) continue;
const event = JSON.parse(line.slice(6));
yield {
type: 'put',
id: event.gameId,
value: event.score,
timestamp: event.ts,
};
}
}
},
};
tables.GameScore.sourcedFrom(scoreboardFeed);
What Comes Next
This guide covered active caching with a push-based subscription. The Semantic Caching with Vector Indexing guide applies caching to AI-powered search — instead of keying the cache by exact ID, Harper finds semantically similar cached answers using vector similarity, so equivalent questions never hit the LLM twice.
Additional Resources
- Caching with Harper — foundational passive caching guide
- Resource API —
sourcedFrom,subscribe, event types - Database Schema —
@table(expiration:)and eviction configuration