effect-ai-streaming

from front-depiction/claude-setup

Reusable Claude Code configuration for Effect TypeScript projects with specialized agents and skills

10 stars4 forksUpdated Jan 19, 2026
npx skills add https://github.com/front-depiction/claude-setup --skill effect-ai-streaming

SKILL.md

Effect AI Streaming

When to Use This Skill

  • Real-time streaming responses from language models
  • Building chat interfaces with incremental updates
  • Managing conversation history with streaming
  • Protecting concurrent stream operations
  • Accumulating stream parts with side effects
  • Converting stream responses to prompt history

Import Patterns

CRITICAL: Always use namespace imports:

import * as Stream from "effect/Stream"
import * as Effect from "effect/Effect"
import * as Channel from "effect/Channel"
import * as SubscriptionRef from "effect/SubscriptionRef"
import * as Match from "effect/Match"
import * as Response from "@effect/ai/Response"

StreamPart Protocol

stream := start → delta* → end

StreamPart lifecycle for each content type follows a three-phase protocol:

text      :: text-start → text-delta* → text-end
reasoning :: reasoning-start → reasoning-delta* → reasoning-end
toolParam :: tool-params-start → tool-params-delta* → tool-params-end
finish    :: { type: "finish", reason: FinishReason, usage: Usage }

Each streaming sequence has a unique id field that links start/delta/end parts.

Part Type Matching

Pattern match on stream parts using Match.value:

import * as Match from "effect/Match"
import * as Effect from "effect/Effect"

const processPart = (part: StreamPart) =>
  Match.value(part).pipe(
    Match.tag("text-delta", ({ delta }) =>
      Effect.sync(() => console.log(delta))
    ),
    Match.tag("reasoning-delta", ({ delta }) =>
      Effect.sync(() => logReasoning(delta))
    ),
    Match.tag("finish", ({ usage, reason }) =>
      Effect.sync(() => recordUsage(usage, reason))
    ),
    Match.orElse(() => Effect.void)
  )

Type guards for stream parts (polymorphic over encoded/decoded):

isTextDelta :: ∀ P. HasType P ⇒ P → P is TextDelta
isToolCallPart :: ∀ P. HasType P ⇒ P → P is ToolCall
isFinishPart :: ∀ P. HasType P ⇒ P → P is Finish

Accumulation Pattern

Accumulate stream parts incrementally using mutable state for efficiency:

import * as Stream from "effect/Stream"
import * as Effect from "effect/Effect"
import * as Prompt from "@effect/ai/Prompt"

const accumulated: Array<StreamPart> = []
let combined = Prompt.empty

stream.pipe(
  Stream.mapChunksEffect(Effect.fnUntraced(function* (chunk) {
    const parts = Array.from(chunk)

    // Append to mutable accumulator
    accumulated.push(...parts)

    // Build prompt from accumulated parts
    combined = Prompt.merge(combined, Prompt.fromResponseParts(parts))

    // Update history incrementally
    yield* SubscriptionRef.set(history, Prompt.merge(checkpoint, combined))

    return chunk
  }))
)

Key insight: Stream.mapChunksEffect enables side-effectful accumulation while preserving stream semantics.

Resource-Safe Streaming

Prevent concurrent stream operations using semaphore protection:

import * as Channel from "effect/Channel"
import * as Semaphore from "effect/Semaphore"
import * as Stream from "effect/Stream"

const streamWithProtection = Stream.fromChannel(
  Channel.acquireUseRelease(
    // Acquire: Take semaphore, get checkpoint
    semaphore.take(1).pipe(
      Effect.zipRight(SubscriptionRef.get(history)),
      Effect.map((hist) => Prompt.merge(hist, newPrompt)),
      Effect.tap((checkpoint) =>
        SubscriptionRef.set(history, checkpoint)
      )
    ),

    // Use: Stream with accumulation
    (checkpoint) => LanguageModel.streamText({ prompt: checkpoint }).pipe(
      Stream.mapChunksEffect(accumulateAndUpdate),
      Stream.toChannel
    ),

    // Release: Always release semaphore
    () => semaphore.release(1)
  )
)

Resource acquisition order:

  1. Take semaphore (exclusive access)
  2. Get current history snapshot
  3. Merge with new prompt
  4. Update history with checkpoint
  5. Stream response (with incremental updates)
  6. Release semaphore (guaranteed via acquireUseRelease)

Consumption Patterns

runForEach :: (A → Effect<R, E>) → Stream<A, E, R> → Effect<Unit, E, R> runDrain :: Stream<A, E, R> → Effect<Unit, E, R> runLast :: Stream<A, E, R> → Effect<Option, E, R>

// Process each part with side effects
stream.pipe(
  Stream.runForEach((part) =>
    Match.value(part).pipe(
      Match.tag("text-delta", ({ delta }) => updateUI(delta)),
      Match.tag("finish", ({ usage }) => recordMetrics(usage)),
      Match.orElse(() => Effect.void)
    )
  )
)

// Consume without collecting (memory efficient)
stream.pipe(
  Stream.tap(logPart),
  Stream.runDrain
)

// Get final accumulated value
stream.pipe(
  Stream.runFold(initialState, (acc, part) => merge(acc, part)),
  Effect.map(Option.some)
)

History Update Pattern

Incremental merge strategy for conversation history:

Prompt.merge :: Prompt → Prompt → Prompt
Prompt.fromResponseParts :: Array<StreamPart> → Prompt

// Pattern: checkpoint + incremental merge
let combined = Prompt.em

...
Read full content

Repository Stats

Stars10
Forks4