effect-ai-streaming
from front-depiction/claude-setup
Reusable Claude Code configuration for Effect TypeScript projects with specialized agents and skills
npx skills add https://github.com/front-depiction/claude-setup --skill effect-ai-streamingSKILL.md
Effect AI Streaming
When to Use This Skill
- Real-time streaming responses from language models
- Building chat interfaces with incremental updates
- Managing conversation history with streaming
- Protecting concurrent stream operations
- Accumulating stream parts with side effects
- Converting stream responses to prompt history
Import Patterns
CRITICAL: Always use namespace imports:
import * as Stream from "effect/Stream"
import * as Effect from "effect/Effect"
import * as Channel from "effect/Channel"
import * as SubscriptionRef from "effect/SubscriptionRef"
import * as Match from "effect/Match"
import * as Response from "@effect/ai/Response"
StreamPart Protocol
stream := start → delta* → end
StreamPart lifecycle for each content type follows a three-phase protocol:
text :: text-start → text-delta* → text-end
reasoning :: reasoning-start → reasoning-delta* → reasoning-end
toolParam :: tool-params-start → tool-params-delta* → tool-params-end
finish :: { type: "finish", reason: FinishReason, usage: Usage }
Each streaming sequence has a unique id field that links start/delta/end parts.
Part Type Matching
Pattern match on stream parts using Match.value:
import * as Match from "effect/Match"
import * as Effect from "effect/Effect"
const processPart = (part: StreamPart) =>
Match.value(part).pipe(
Match.tag("text-delta", ({ delta }) =>
Effect.sync(() => console.log(delta))
),
Match.tag("reasoning-delta", ({ delta }) =>
Effect.sync(() => logReasoning(delta))
),
Match.tag("finish", ({ usage, reason }) =>
Effect.sync(() => recordUsage(usage, reason))
),
Match.orElse(() => Effect.void)
)
Type guards for stream parts (polymorphic over encoded/decoded):
isTextDelta :: ∀ P. HasType P ⇒ P → P is TextDelta
isToolCallPart :: ∀ P. HasType P ⇒ P → P is ToolCall
isFinishPart :: ∀ P. HasType P ⇒ P → P is Finish
Accumulation Pattern
Accumulate stream parts incrementally using mutable state for efficiency:
import * as Stream from "effect/Stream"
import * as Effect from "effect/Effect"
import * as Prompt from "@effect/ai/Prompt"
const accumulated: Array<StreamPart> = []
let combined = Prompt.empty
stream.pipe(
Stream.mapChunksEffect(Effect.fnUntraced(function* (chunk) {
const parts = Array.from(chunk)
// Append to mutable accumulator
accumulated.push(...parts)
// Build prompt from accumulated parts
combined = Prompt.merge(combined, Prompt.fromResponseParts(parts))
// Update history incrementally
yield* SubscriptionRef.set(history, Prompt.merge(checkpoint, combined))
return chunk
}))
)
Key insight: Stream.mapChunksEffect enables side-effectful accumulation while preserving stream semantics.
Resource-Safe Streaming
Prevent concurrent stream operations using semaphore protection:
import * as Channel from "effect/Channel"
import * as Semaphore from "effect/Semaphore"
import * as Stream from "effect/Stream"
const streamWithProtection = Stream.fromChannel(
Channel.acquireUseRelease(
// Acquire: Take semaphore, get checkpoint
semaphore.take(1).pipe(
Effect.zipRight(SubscriptionRef.get(history)),
Effect.map((hist) => Prompt.merge(hist, newPrompt)),
Effect.tap((checkpoint) =>
SubscriptionRef.set(history, checkpoint)
)
),
// Use: Stream with accumulation
(checkpoint) => LanguageModel.streamText({ prompt: checkpoint }).pipe(
Stream.mapChunksEffect(accumulateAndUpdate),
Stream.toChannel
),
// Release: Always release semaphore
() => semaphore.release(1)
)
)
Resource acquisition order:
- Take semaphore (exclusive access)
- Get current history snapshot
- Merge with new prompt
- Update history with checkpoint
- Stream response (with incremental updates)
- Release semaphore (guaranteed via
acquireUseRelease)
Consumption Patterns
runForEach :: (A → Effect<R, E>) → Stream<A, E, R> → Effect<Unit, E, R> runDrain :: Stream<A, E, R> → Effect<Unit, E, R> runLast :: Stream<A, E, R> → Effect<Option, E, R>
// Process each part with side effects
stream.pipe(
Stream.runForEach((part) =>
Match.value(part).pipe(
Match.tag("text-delta", ({ delta }) => updateUI(delta)),
Match.tag("finish", ({ usage }) => recordMetrics(usage)),
Match.orElse(() => Effect.void)
)
)
)
// Consume without collecting (memory efficient)
stream.pipe(
Stream.tap(logPart),
Stream.runDrain
)
// Get final accumulated value
stream.pipe(
Stream.runFold(initialState, (acc, part) => merge(acc, part)),
Effect.map(Option.some)
)
History Update Pattern
Incremental merge strategy for conversation history:
Prompt.merge :: Prompt → Prompt → Prompt
Prompt.fromResponseParts :: Array<StreamPart> → Prompt
// Pattern: checkpoint + incremental merge
let combined = Prompt.em
...