Skip to main content

Chapter 24: Real-Time Data

Real-time features keep the UI synchronized with server state without polling. This chapter covers Server-Sent Events (SSE) and WebSockets, with integration into TanStack Query.

SSE vs. WebSockets​

FeatureSSEWebSocket
DirectionServer → Client (one-way)Bidirectional
ProtocolHTTP/1.1 or HTTP/2WebSocket (ws://)
ReconnectionAutomatic (built-in)Manual
Proxy/firewall friendlyYesSometimes problematic
Binary dataNo (text only)Yes
Best forNotifications, feeds, live updatesChat, collaborative editing, gaming

Recommendation: Use SSE for most real-time features. Use WebSockets only when you need bidirectional communication.

Server-Sent Events​

Custom React Hook​

// shared/hooks/use-sse.ts
import { useEffect, useRef, useCallback } from "react";

interface UseSSEOptions<T> {
url: string;
onMessage: (data: T) => void;
onError?: (event: Event) => void;
enabled?: boolean;
}

export function useSSE<T>({ url, onMessage, onError, enabled = true }: UseSSEOptions<T>) {
const eventSourceRef = useRef<EventSource | null>(null);
const onMessageRef = useRef(onMessage);
onMessageRef.current = onMessage;

useEffect(() => {
if (!enabled) return;

const eventSource = new EventSource(url);
eventSourceRef.current = eventSource;

eventSource.onmessage = (event) => {
try {
const data = JSON.parse(event.data) as T;
onMessageRef.current(data);
} catch (error) {
console.error("Failed to parse SSE message:", error);
}
};

eventSource.onerror = (event) => {
onError?.(event);
// EventSource automatically reconnects
};

return () => {
eventSource.close();
eventSourceRef.current = null;
};
}, [url, enabled, onError]);

const close = useCallback(() => {
eventSourceRef.current?.close();
eventSourceRef.current = null;
}, []);

return { close };
}

Integrating SSE with TanStack Query​

The real power: SSE events invalidate TanStack Query caches, triggering automatic refetches:

// features/projects/hooks/use-project-events.ts
import { useQueryClient } from "@tanstack/react-query";
import { useSSE } from "@/shared/hooks/use-sse";
import { projectKeys } from "./query-keys";

interface ProjectEvent {
type: "project:created" | "project:updated" | "project:deleted" | "task:updated";
projectId: string;
data?: unknown;
}

export function useProjectEvents(organizationId: string) {
const queryClient = useQueryClient();

useSSE<ProjectEvent>({
url: `/api/events/projects?orgId=${organizationId}`,
onMessage: (event) => {
switch (event.type) {
case "project:created":
case "project:deleted":
// Invalidate lists — they need to refetch
queryClient.invalidateQueries({ queryKey: projectKeys.lists() });
break;

case "project:updated":
// Invalidate the specific project
queryClient.invalidateQueries({
queryKey: projectKeys.detail(event.projectId),
});
// Also invalidate lists since the project data changed
queryClient.invalidateQueries({ queryKey: projectKeys.lists() });
break;

case "task:updated":
// Invalidate tasks for this project
queryClient.invalidateQueries({
queryKey: projectKeys.detailTasks(event.projectId),
});
break;
}
},
});
}

// Use in a layout component so events are always active
function AuthenticatedLayout() {
const { organizationId } = useCurrentOrganization();
useProjectEvents(organizationId);

return (
<AppLayout>
<Outlet />
</AppLayout>
);
}

Notification System​

A complete notification system using SSE:

// features/notifications/hooks/use-notifications.ts
interface Notification {
id: string;
type: "info" | "success" | "warning" | "error";
title: string;
message: string;
read: boolean;
createdAt: string;
}

export function useNotificationStream() {
const queryClient = useQueryClient();

useSSE<Notification>({
url: "/api/events/notifications",
onMessage: (notification) => {
// Add to notification cache
queryClient.setQueryData<Notification[]>(
["notifications"],
(old = []) => [notification, ...old]
);

// Show toast for urgent notifications
if (notification.type === "error" || notification.type === "warning") {
toast[notification.type](notification.title, {
description: notification.message,
});
}
},
});
}

// Notification bell with unread count
function NotificationBell() {
const { data: notifications = [] } = useQuery({
queryKey: ["notifications"],
queryFn: () =>
AppRuntime.runPromise(NotificationService.listRecent()),
});

const unreadCount = notifications.filter((n) => !n.read).length;

return (
<Button variant="ghost" size="icon" className="relative">
<BellIcon className="h-5 w-5" />
{unreadCount > 0 && (
<span className="absolute -right-1 -top-1 flex h-5 w-5 items-center justify-center rounded-full bg-destructive text-xs text-destructive-foreground">
{unreadCount > 9 ? "9+" : unreadCount}
</span>
)}
</Button>
);
}

WebSocket for Bidirectional Communication​

For features requiring bidirectional communication (collaborative editing, chat):

// shared/hooks/use-websocket.ts
import { useEffect, useRef, useState, useCallback } from "react";

interface UseWebSocketOptions {
url: string;
onMessage?: (data: unknown) => void;
reconnectInterval?: number;
maxRetries?: number;
}

export function useWebSocket({ url, onMessage, reconnectInterval = 3000, maxRetries = 5 }: UseWebSocketOptions) {
const wsRef = useRef<WebSocket | null>(null);
const retriesRef = useRef(0);
const [isConnected, setIsConnected] = useState(false);

const connect = useCallback(() => {
const ws = new WebSocket(url);
wsRef.current = ws;

ws.onopen = () => {
setIsConnected(true);
retriesRef.current = 0;
};

ws.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
onMessage?.(data);
} catch {
onMessage?.(event.data);
}
};

ws.onclose = () => {
setIsConnected(false);
if (retriesRef.current < maxRetries) {
retriesRef.current += 1;
setTimeout(connect, reconnectInterval);
}
};

ws.onerror = () => {
ws.close();
};
}, [url, onMessage, reconnectInterval, maxRetries]);

useEffect(() => {
connect();
return () => {
wsRef.current?.close();
};
}, [connect]);

const send = useCallback((data: unknown) => {
if (wsRef.current?.readyState === WebSocket.OPEN) {
wsRef.current.send(JSON.stringify(data));
}
}, []);

return { send, isConnected };
}

Summary​

  • ✅ SSE for one-way server-to-client updates (notifications, live feeds)
  • ✅ WebSockets for bidirectional communication (chat, collaboration)
  • ✅ SSE events invalidate TanStack Query caches for automatic UI updates
  • ✅ Built-in reconnection for SSE; manual reconnection logic for WebSockets
  • ✅ Notification system combining SSE, TanStack Query cache, and toast notifications