valrs

Streaming

O(1) memory validation for large JSON arrays and NDJSON streams

Streaming

valrs provides streaming validation for processing large files with constant memory usage. Instead of loading entire files into memory, items are validated one at a time as they arrive from the stream.

Why Streaming?

Traditional validation requires loading the entire dataset into memory:

// Memory usage: O(n) - entire file must fit in memory
const users = JSON.parse(await response.text());
const validated = v.array(User).parse(users);

With streaming validation, memory usage is constant:

// Memory usage: O(1) - only one item in memory at a time
for await (const user of stream(v.array(User), response.body!)) {
  await processUser(user);
}

This enables processing files of any size, limited only by disk space or network bandwidth.

Streaming JSON Arrays

Use stream() to validate JSON arrays incrementally:

import { v, stream } from 'valrs';
 
const User = v.object({
  id: v.number(),
  name: v.string(),
  email: v.string().email(),
});
 
// Stream from fetch response
const response = await fetch('/api/users.json');
for await (const user of stream(v.array(User), response.body!)) {
  console.log(user); // Each user validated as it arrives
}

The input must be a valid JSON array (e.g., [{...}, {...}, {...}]). The parser handles arrays split across multiple chunks correctly.

Streaming NDJSON

Use streamLines() for newline-delimited JSON (NDJSON), where each line is a separate JSON object:

import { v, streamLines } from 'valrs';
 
const LogEvent = v.object({
  timestamp: v.string().datetime(),
  level: v.enum(['info', 'warn', 'error']),
  message: v.string(),
});
 
// Each line is validated independently
for await (const event of streamLines(LogEvent, logStream)) {
  if (event.level === 'error') {
    alertOnError(event);
  }
}

NDJSON format (one JSON object per line):

{"timestamp": "2024-01-15T10:30:00Z", "level": "info", "message": "Started"}
{"timestamp": "2024-01-15T10:30:01Z", "level": "error", "message": "Failed"}

Stream Options

Both stream() and streamLines() accept an options object:

const result = stream(v.array(User), input, {
  maxItems: 10000,         // Stop after N items
  maxBytes: '100MB',       // Stop after N bytes processed
  timeout: '30s',          // Timeout duration
  onError: 'skip',         // Error handling strategy
  highWaterMark: 16,       // Backpressure threshold (items)
});

Options Reference

OptionTypeDefaultDescription
maxItemsnumberInfinityMaximum items to process
maxBytesnumber | stringInfinityMaximum bytes to process
timeoutnumber | stringInfinityProcessing timeout
onError'throw' | 'skip' | 'collect''throw'Error handling strategy
highWaterMarknumber16Backpressure threshold

Size and Duration Formats

maxBytes accepts numbers (bytes) or human-readable strings:

maxBytes: 1024          // 1024 bytes
maxBytes: '100KB'       // 100 kilobytes
maxBytes: '50MB'        // 50 megabytes
maxBytes: '1GB'         // 1 gigabyte

timeout accepts numbers (milliseconds) or duration strings:

timeout: 5000           // 5000 milliseconds
timeout: '30s'          // 30 seconds
timeout: '5m'           // 5 minutes
timeout: '1h'           // 1 hour

Error Handling Modes

The onError option controls how validation failures are handled:

throw (default)

Stops processing and throws on the first validation error:

try {
  for await (const user of stream(v.array(User), input, { onError: 'throw' })) {
    process(user);
  }
} catch (error) {
  console.error('Validation failed:', error.message);
}

skip

Silently skips invalid items and continues processing:

// Invalid items are ignored, only valid items are yielded
for await (const user of stream(v.array(User), input, { onError: 'skip' })) {
  process(user); // Only receives valid users
}

collect

Continues processing but collects all errors for later inspection:

const result = stream(v.array(User), input, { onError: 'collect' });
 
for await (const user of result) {
  process(user);
}
 
// After iteration, check collected errors
if (result.errors.length > 0) {
  console.log(`${result.errors.length} items failed validation:`);
  for (const { index, error, rawValue } of result.errors) {
    console.log(`  Item ${index}: ${error.message}`);
  }
}

Each collected error contains:

  • index: The position of the failed item
  • error: The ValError with validation details
  • rawValue: The original parsed value (if available)

Result Methods

toArray()

Collects all validated items into an array:

const users = await stream(v.array(User), input).toArray();
console.log(`Loaded ${users.length} users`);

Using toArray() loads all items into memory, negating the streaming benefit. Use it only when you need all items at once and know the dataset fits in memory.

pipeTo()

Pipes validated items directly to a writable stream:

const validatedStream = stream(v.array(User), input);
 
const writable = new WritableStream<User>({
  write(user) {
    // Process each validated user
    database.insert(user);
  },
});
 
await validatedStream.pipeTo(writable);

This is useful for ETL pipelines where you want to write validated data to a database or file without holding everything in memory.

Real-World Examples

Streaming from Fetch Response

Process a large API response without loading it entirely into memory:

import { v, stream } from 'valrs';
 
const Product = v.object({
  id: v.string().uuid(),
  name: v.string(),
  price: v.number().positive(),
  inventory: v.number().int().nonnegative(),
});
 
async function syncProducts() {
  const response = await fetch('https://api.example.com/products.json');
 
  if (!response.ok) {
    throw new Error(`HTTP ${response.status}`);
  }
 
  let count = 0;
  for await (const product of stream(v.array(Product), response.body!, {
    maxItems: 100000,
    timeout: '5m',
    onError: 'skip',
  })) {
    await database.upsertProduct(product);
    count++;
 
    if (count % 1000 === 0) {
      console.log(`Processed ${count} products...`);
    }
  }
 
  console.log(`Sync complete: ${count} products`);
}

Streaming from File (Node.js)

Process a large JSON file from disk:

import { createReadStream } from 'node:fs';
import { v, stream } from 'valrs';
 
const Transaction = v.object({
  id: v.string(),
  amount: v.number(),
  currency: v.string().length(3),
  timestamp: v.string().datetime(),
});
 
async function processTransactionLog(filePath: string) {
  const fileStream = createReadStream(filePath);
 
  const result = stream(v.array(Transaction), fileStream, {
    maxBytes: '1GB',
    onError: 'collect',
  });
 
  let total = 0;
  for await (const tx of result) {
    total += tx.amount;
  }
 
  if (result.errors.length > 0) {
    console.warn(`${result.errors.length} invalid transactions skipped`);
  }
 
  return total;
}

Processing NDJSON Logs

Stream and analyze log files in NDJSON format:

import { createReadStream } from 'node:fs';
import { v, streamLines } from 'valrs';
 
const LogEntry = v.object({
  timestamp: v.string().datetime(),
  level: v.enum(['debug', 'info', 'warn', 'error']),
  service: v.string(),
  message: v.string(),
  metadata: v.record(v.string(), v.unknown()).optional(),
});
 
async function analyzeErrors(logPath: string) {
  const fileStream = createReadStream(logPath);
  const errors: Array<{ timestamp: string; service: string; message: string }> = [];
 
  for await (const entry of streamLines(LogEntry, fileStream, { onError: 'skip' })) {
    if (entry.level === 'error') {
      errors.push({
        timestamp: entry.timestamp,
        service: entry.service,
        message: entry.message,
      });
    }
  }
 
  return errors;
}

ETL Pipeline with Validation

Build an extract-transform-load pipeline with streaming validation:

import { v, stream } from 'valrs';
 
const RawRecord = v.object({
  id: v.string(),
  data: v.string(),
  created: v.string(),
});
 
const TransformedRecord = v.object({
  id: v.string().uuid(),
  payload: v.unknown(),
  createdAt: v.date(),
});
 
async function etlPipeline(sourceUrl: string) {
  const response = await fetch(sourceUrl);
 
  // Extract and validate
  const validated = stream(v.array(RawRecord), response.body!, {
    onError: 'collect',
  });
 
  // Transform and load
  const writer = database.bulkWriter();
 
  for await (const record of validated) {
    const transformed = {
      id: record.id,
      payload: JSON.parse(record.data),
      createdAt: new Date(record.created),
    };
 
    // Validate transformed data
    const result = TransformedRecord.safeParse(transformed);
    if (result.success) {
      writer.insert(result.data);
    }
  }
 
  await writer.flush();
 
  // Report any extraction errors
  if (validated.errors.length > 0) {
    console.warn(`${validated.errors.length} records failed validation`);
  }
}

Supported Input Types

Both stream() and streamLines() accept multiple input types:

// Web Streams API (browsers, Deno, Node.js 18+)
const response = await fetch('/data.json');
stream(schema, response.body!);
 
// Node.js Readable streams (automatically converted)
import { createReadStream } from 'node:fs';
stream(schema, createReadStream('data.json'));
 
// Any async iterable
async function* generateChunks() {
  yield '[{"id": 1},';
  yield '{"id": 2}]';
}
stream(schema, generateChunks());

Memory Efficiency

Streaming validation maintains O(1) memory usage by:

  1. Incremental parsing: JSON is parsed character-by-character, extracting complete items as they appear
  2. Immediate validation: Each item is validated and yielded before the next is parsed
  3. No buffering: Validated items are not stored; they flow through to your processing logic
  4. Backpressure support: The stream respects consumer speed via highWaterMark

This means you can process a 10GB file with the same memory footprint as a 10KB file.

When to Use Streaming

ScenarioRecommended Approach
File < 10MBRegular parse() is fine
File 10MB - 100MBConsider streaming
File > 100MBAlways use streaming
Unknown size (API response)Use streaming for safety
Real-time data (WebSocket, SSE)Use streaming

Testing with Mock Streams

For testing, use the provided helper functions:

import { v, stream, createMockStream, createChunkedStream } from 'valrs';
 
// Create a stream from chunks
const mockStream = createMockStream([
  '[{"id": 1, "name": "Alice"},',
  '{"id": 2, "name": "Bob"}]',
]);
 
for await (const user of stream(v.array(User), mockStream)) {
  console.log(user);
}
 
// Create a stream from a complete JSON string, auto-chunked
const chunkedStream = createChunkedStream(
  JSON.stringify([{ id: 1 }, { id: 2 }, { id: 3 }]),
  32 // chunk size in characters
);

Next Steps