Real-time Applications with AWS WebSockets and TypeScript
Modern applications increasingly demand real-time capabilities—from live chat systems and collaborative editing to real-time dashboards and gaming. In this final post of our AWS and TypeScript series, we’ll explore how to build scalable real-time applications using AWS API Gateway WebSocket APIs, Lambda functions, and TypeScript to create robust, type-safe real-time communication systems.
Understanding WebSocket APIs with AWS
AWS API Gateway WebSocket APIs provide a fully managed service for building real-time, bidirectional communication applications. Key advantages include:
- Persistent Connections: Maintain long-lived connections for instant messaging
- Automatic Scaling: Handle thousands of concurrent connections without infrastructure management
- Pay-per-Use: Cost-effective pricing model with no idle charges
- AWS Integration: Seamless integration with Lambda, DynamoDB, and other AWS services
- Route-Based Architecture: Organize message handling with custom routes
Prerequisites
Essential knowledge and tools for WebSocket development:
- Previous Series Posts: Lambda, DynamoDB, and API Gateway fundamentals
- AWS SDK v3: API Gateway Management API and DynamoDB clients
- WebSocket Concepts: Connection lifecycle, message routing, and error handling
- Real-time Patterns: Broadcasting, presence management, and state synchronization
Type-Safe WebSocket Architecture
Define comprehensive type definitions for WebSocket operations:
// src/types/websocket.ts
export interface WebSocketEvent {
requestContext: {
connectionId: string;
routeKey: string;
stage: string;
apiId: string;
domainName: string;
eventType: 'CONNECT' | 'DISCONNECT' | 'MESSAGE';
messageId?: string;
requestTime: string;
requestTimeEpoch: number;
identity: {
sourceIp: string;
userAgent: string;
};
authorizer?: {
userId: string;
username: string;
[key: string]: any;
};
};
body?: string;
isBase64Encoded: boolean;
headers: Record<string, string>;
queryStringParameters?: Record<string, string>;
}
export interface WebSocketResponse {
statusCode: number;
headers?: Record<string, string>;
body?: string;
}
export interface ConnectionData {
connectionId: string;
userId: string;
username: string;
roomId?: string;
connectedAt: string;
lastSeenAt: string;
userAgent: string;
ipAddress: string;
}
export interface ChatMessage {
messageId: string;
roomId: string;
userId: string;
username: string;
content: string;
messageType: MessageType;
timestamp: string;
editedAt?: string;
replyToMessageId?: string;
}
export interface ChatRoom {
roomId: string;
name: string;
description?: string;
createdBy: string;
createdAt: string;
isPrivate: boolean;
members: string[];
lastMessageAt?: string;
lastMessagePreview?: string;
}
export enum MessageType {
TEXT = 'TEXT',
IMAGE = 'IMAGE',
FILE = 'FILE',
SYSTEM = 'SYSTEM',
TYPING = 'TYPING'
}
export enum WebSocketAction {
JOIN_ROOM = 'joinRoom',
LEAVE_ROOM = 'leaveRoom',
SEND_MESSAGE = 'sendMessage',
EDIT_MESSAGE = 'editMessage',
DELETE_MESSAGE = 'deleteMessage',
TYPING_START = 'typingStart',
TYPING_STOP = 'typingStop',
LIST_ROOMS = 'listRooms',
CREATE_ROOM = 'createRoom',
GET_ROOM_HISTORY = 'getRoomHistory'
}
export interface WebSocketMessage {
action: WebSocketAction;
data: any;
requestId?: string;
}
export interface WebSocketBroadcast {
type: 'message' | 'user_joined' | 'user_left' | 'typing' | 'room_update';
data: any;
roomId?: string;
excludeConnectionId?: string;
}
Connection Management Service
Implement robust connection management with DynamoDB storage:
// src/services/connection-service.ts
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import {
DynamoDBDocumentClient,
PutCommand,
GetCommand,
DeleteCommand,
QueryCommand,
UpdateCommand
} from '@aws-sdk/lib-dynamodb';
import { ConnectionData } from '../types/websocket';
export class ConnectionService {
private docClient: DynamoDBDocumentClient;
private tableName: string;
constructor(tableName: string) {
this.docClient = DynamoDBDocumentClient.from(new DynamoDBClient({}));
this.tableName = tableName;
}
async saveConnection(connectionData: ConnectionData): Promise<void> {
await this.docClient.send(new PutCommand({
TableName: this.tableName,
Item: {
pk: `CONNECTION#${connectionData.connectionId}`,
sk: `CONNECTION#${connectionData.connectionId}`,
gsi1pk: `USER#${connectionData.userId}`,
gsi1sk: `CONNECTION#${connectionData.connectedAt}`,
gsi2pk: connectionData.roomId ? `ROOM#${connectionData.roomId}` : undefined,
gsi2sk: connectionData.roomId ? `CONNECTION#${connectionData.connectionId}` : undefined,
entityType: 'CONNECTION',
...connectionData,
ttl: Math.floor(Date.now() / 1000) + (24 * 60 * 60) // 24 hours TTL
}
}));
}
async getConnection(connectionId: string): Promise<ConnectionData | null> {
const { Item } = await this.docClient.send(new GetCommand({
TableName: this.tableName,
Key: { pk: `CONNECTION#${connectionId}`, sk: `CONNECTION#${connectionId}` }
}));
return Item ? this.mapItemToConnection(Item) : null;
}
async deleteConnection(connectionId: string): Promise<void> {
await this.docClient.send(new DeleteCommand({
TableName: this.tableName,
Key: { pk: `CONNECTION#${connectionId}`, sk: `CONNECTION#${connectionId}` }
}));
}
async updateConnectionRoom(connectionId: string, roomId: string | null): Promise<void> {
const updateExpression = roomId
? 'SET roomId = :roomId, gsi2pk = :gsi2pk, gsi2sk = :gsi2sk, lastSeenAt = :lastSeenAt'
: 'SET lastSeenAt = :lastSeenAt REMOVE roomId, gsi2pk, gsi2sk';
const attributeValues: Record<string, any> = {
':lastSeenAt': new Date().toISOString()
};
if (roomId) {
Object.assign(attributeValues, {
':roomId': roomId,
':gsi2pk': `ROOM#${roomId}`,
':gsi2sk': `CONNECTION#${connectionId}`
});
}
await this.docClient.send(new UpdateCommand({
TableName: this.tableName,
Key: { pk: `CONNECTION#${connectionId}`, sk: `CONNECTION#${connectionId}` },
UpdateExpression: updateExpression,
ExpressionAttributeValues: attributeValues
}));
}
async getConnectionsByRoom(roomId: string): Promise<ConnectionData[]> {
const { Items } = await this.docClient.send(new QueryCommand({
TableName: this.tableName,
IndexName: 'GSI2',
KeyConditionExpression: 'gsi2pk = :roomPk',
ExpressionAttributeValues: { ':roomPk': `ROOM#${roomId}` }
}));
return Items?.map(this.mapItemToConnection) || [];
}
async getConnectionsByUser(userId: string): Promise<ConnectionData[]> {
const { Items } = await this.docClient.send(new QueryCommand({
TableName: this.tableName,
IndexName: 'GSI1',
KeyConditionExpression: 'gsi1pk = :userPk',
ExpressionAttributeValues: { ':userPk': `USER#${userId}` }
}));
return Items?.map(this.mapItemToConnection) || [];
}
private mapItemToConnection(item: any): ConnectionData {
return {
connectionId: item.connectionId,
userId: item.userId,
username: item.username,
roomId: item.roomId,
connectedAt: item.connectedAt,
lastSeenAt: item.lastSeenAt,
userAgent: item.userAgent,
ipAddress: item.ipAddress
};
}
}
WebSocket Communication Service
Create a service for sending messages to WebSocket connections:
// src/services/websocket-service.ts
import { ApiGatewayManagementApiClient, PostToConnectionCommand } from '@aws-sdk/client-apigatewaymanagementapi';
import { ConnectionService } from './connection-service';
import { WebSocketBroadcast } from '../types/websocket';
export class WebSocketService {
private apiGatewayClient: ApiGatewayManagementApiClient;
private connectionService: ConnectionService;
constructor(endpoint: string, connectionService: ConnectionService) {
this.apiGatewayClient = new ApiGatewayManagementApiClient({ endpoint });
this.connectionService = connectionService;
}
async sendToConnection(connectionId: string, data: any): Promise<boolean> {
try {
await this.apiGatewayClient.send(new PostToConnectionCommand({
ConnectionId: connectionId,
Data: JSON.stringify(data)
}));
return true;
} catch (error) {
console.error(`Failed to send to ${connectionId}:`, error);
// Clean up stale connections (410 Gone)
if (error.statusCode === 410) {
await this.connectionService.deleteConnection(connectionId);
}
return false;
}
}
async broadcastToRoom(broadcast: WebSocketBroadcast): Promise<{ successful: number; failed: number }> {
if (!broadcast.roomId) {
throw new Error('Room ID required for room broadcasts');
}
const connections = await this.connectionService.getConnectionsByRoom(broadcast.roomId);
const filteredConnections = connections.filter(
conn => conn.connectionId !== broadcast.excludeConnectionId
);
const results = await Promise.allSettled(
filteredConnections.map(connection =>
this.sendToConnection(connection.connectionId, broadcast.data)
)
);
const successful = results.filter(
result => result.status === 'fulfilled' && result.value
).length;
return {
successful,
failed: filteredConnections.length - successful
};
}
async broadcastToUser(
userId: string,
data: any,
excludeConnectionId?: string
): Promise<{ successful: number; failed: number }> {
const connections = await this.connectionService.getConnectionsByUser(userId);
const filteredConnections = connections.filter(
conn => conn.connectionId !== excludeConnectionId
);
const results = await Promise.allSettled(
filteredConnections.map(connection =>
this.sendToConnection(connection.connectionId, data)
)
);
const successful = results.filter(
result => result.status === 'fulfilled' && result.value
).length;
return {
successful,
failed: filteredConnections.length - successful
};
}
async sendError(connectionId: string, error: string, requestId?: string): Promise<void> {
await this.sendToConnection(connectionId, {
type: 'error',
error,
requestId,
timestamp: new Date().toISOString()
});
}
async sendSuccess(connectionId: string, data: any, requestId?: string): Promise<void> {
await this.sendToConnection(connectionId, {
type: 'success',
data,
requestId,
timestamp: new Date().toISOString()
});
}
}
Chat and Room Management
Implement comprehensive chat functionality:
// src/services/chat-service.ts
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import {
DynamoDBDocumentClient,
PutCommand,
GetCommand,
QueryCommand,
UpdateCommand
} from '@aws-sdk/lib-dynamodb';
import { v4 as uuidv4 } from 'uuid';
import { ChatMessage, ChatRoom, MessageType } from '../types/websocket';
export class ChatService {
private docClient: DynamoDBDocumentClient;
private tableName: string;
constructor(tableName: string) {
this.docClient = DynamoDBDocumentClient.from(new DynamoDBClient({}));
this.tableName = tableName;
}
async createRoom(roomData: {
name: string;
description?: string;
createdBy: string;
isPrivate: boolean;
members: string[];
}): Promise<ChatRoom> {
const roomId = uuidv4();
const timestamp = new Date().toISOString();
const room: ChatRoom = {
roomId,
name: roomData.name,
description: roomData.description,
createdBy: roomData.createdBy,
createdAt: timestamp,
isPrivate: roomData.isPrivate,
members: [...new Set([roomData.createdBy, ...roomData.members])]
};
await this.docClient.send(new PutCommand({
TableName: this.tableName,
Item: {
pk: `ROOM#${roomId}`,
sk: `ROOM#${roomId}`,
gsi1pk: `ROOMS#${roomData.isPrivate ? 'PRIVATE' : 'PUBLIC'}`,
gsi1sk: `ROOM#${timestamp}`,
entityType: 'ROOM',
...room
}
}));
return room;
}
async getRoom(roomId: string): Promise<ChatRoom | null> {
const { Item } = await this.docClient.send(new GetCommand({
TableName: this.tableName,
Key: { pk: `ROOM#${roomId}`, sk: `ROOM#${roomId}` }
}));
return Item ? this.mapItemToRoom(Item) : null;
}
async listRooms(isPrivate: boolean = false, limit: number = 50): Promise<ChatRoom[]> {
const { Items } = await this.docClient.send(new QueryCommand({
TableName: this.tableName,
IndexName: 'GSI1',
KeyConditionExpression: 'gsi1pk = :roomType',
ExpressionAttributeValues: {
':roomType': `ROOMS#${isPrivate ? 'PRIVATE' : 'PUBLIC'}`
},
Limit: limit,
ScanIndexForward: false
}));
return Items?.map(this.mapItemToRoom) || [];
}
async sendMessage(messageData: {
roomId: string;
userId: string;
username: string;
content: string;
messageType?: MessageType;
replyToMessageId?: string;
}): Promise<ChatMessage> {
const messageId = uuidv4();
const timestamp = new Date().toISOString();
const message: ChatMessage = {
messageId,
roomId: messageData.roomId,
userId: messageData.userId,
username: messageData.username,
content: messageData.content,
messageType: messageData.messageType || MessageType.TEXT,
timestamp,
replyToMessageId: messageData.replyToMessageId
};
// Store message
await this.docClient.send(new PutCommand({
TableName: this.tableName,
Item: {
pk: `ROOM#${messageData.roomId}`,
sk: `MESSAGE#${timestamp}#${messageId}`,
gsi1pk: `MESSAGE#${messageId}`,
gsi1sk: `MESSAGE#${messageId}`,
entityType: 'MESSAGE',
...message
}
}));
// Update room's last message
await this.updateRoomLastMessage(messageData.roomId, message);
return message;
}
async getRoomMessages(
roomId: string,
limit: number = 50,
exclusiveStartKey?: Record<string, any>
): Promise<{ messages: ChatMessage[]; lastEvaluatedKey?: Record<string, any> }> {
const { Items, LastEvaluatedKey } = await this.docClient.send(new QueryCommand({
TableName: this.tableName,
KeyConditionExpression: 'pk = :roomPk AND begins_with(sk, :messagePrefix)',
ExpressionAttributeValues: {
':roomPk': `ROOM#${roomId}`,
':messagePrefix': 'MESSAGE#'
},
Limit: limit,
ScanIndexForward: false, // Latest messages first
ExclusiveStartKey: exclusiveStartKey
}));
return {
messages: Items?.map(this.mapItemToMessage) || [],
lastEvaluatedKey: LastEvaluatedKey
};
}
async editMessage(
messageId: string,
newContent: string,
userId: string
): Promise<ChatMessage | null> {
const timestamp = new Date().toISOString();
try {
const { Attributes } = await this.docClient.send(new UpdateCommand({
TableName: this.tableName,
Key: { pk: `MESSAGE#${messageId}`, sk: `MESSAGE#${messageId}` },
UpdateExpression: 'SET content = :content, editedAt = :editedAt',
ConditionExpression: 'userId = :userId',
ExpressionAttributeValues: {
':content': newContent,
':editedAt': timestamp,
':userId': userId
},
ReturnValues: 'ALL_NEW'
}));
return Attributes ? this.mapItemToMessage(Attributes) : null;
} catch (error) {
if (error.name === 'ConditionalCheckFailedException') {
throw new Error('Unauthorized to edit this message');
}
throw error;
}
}
private async updateRoomLastMessage(roomId: string, message: ChatMessage): Promise<void> {
const preview = message.content.length > 100
? `${message.content.substring(0, 100)}...`
: message.content;
await this.docClient.send(new UpdateCommand({
TableName: this.tableName,
Key: { pk: `ROOM#${roomId}`, sk: `ROOM#${roomId}` },
UpdateExpression: 'SET lastMessageAt = :timestamp, lastMessagePreview = :preview',
ExpressionAttributeValues: {
':timestamp': message.timestamp,
':preview': `${message.username}: ${preview}`
}
}));
}
private mapItemToRoom(item: any): ChatRoom {
return {
roomId: item.roomId,
name: item.name,
description: item.description,
createdBy: item.createdBy,
createdAt: item.createdAt,
isPrivate: item.isPrivate,
members: item.members,
lastMessageAt: item.lastMessageAt,
lastMessagePreview: item.lastMessagePreview
};
}
private mapItemToMessage(item: any): ChatMessage {
return {
messageId: item.messageId,
roomId: item.roomId,
userId: item.userId,
username: item.username,
content: item.content,
messageType: item.messageType,
timestamp: item.timestamp,
editedAt: item.editedAt,
replyToMessageId: item.replyToMessageId };
}
}
WebSocket Lambda Handlers
Implement the Lambda functions that handle WebSocket events:
// src/handlers/websocket/connect.ts
import { WebSocketEvent, WebSocketResponse } from '../../types/websocket';
import { ConnectionService } from '../../services/connection-service';
const connectionService = new ConnectionService(process.env.TABLE_NAME!);
export const handler = async (event: WebSocketEvent): Promise<WebSocketResponse> => {
console.log('WebSocket connection event:', JSON.stringify(event, null, 2));
try {
const { connectionId, authorizer, identity, requestTime } = event.requestContext;
if (!authorizer || !authorizer.userId || !authorizer.username) {
console.error('Missing authorization data');
return { statusCode: 401 };
}
const connectionData = {
connectionId,
userId: authorizer.userId,
username: authorizer.username,
connectedAt: requestTime,
lastSeenAt: requestTime,
userAgent: identity.userAgent,
ipAddress: identity.sourceIp,
};
await connectionService.saveConnection(connectionData);
console.log(`Connection ${connectionId} saved for user ${authorizer.userId}`);
return { statusCode: 200 };
} catch (error) {
console.error('Error handling connection:', error);
return { statusCode: 500 };
}
};
// src/handlers/websocket/disconnect.ts
import { WebSocketEvent, WebSocketResponse } from '../../types/websocket';
import { ConnectionService } from '../../services/connection-service';
import { WebSocketService } from '../../services/websocket-service';
const connectionService = new ConnectionService(process.env.TABLE_NAME!);
const websocketService = new WebSocketService(
`https://${process.env.API_ID}.execute-api.${process.env.REGION}.amazonaws.com/${process.env.STAGE}`,
connectionService
);
export const handler = async (event: WebSocketEvent): Promise<WebSocketResponse> => {
console.log('WebSocket disconnect event:', JSON.stringify(event, null, 2));
try {
const { connectionId } = event.requestContext;
// Get connection data before deleting
const connection = await connectionService.getConnection(connectionId);
if (connection && connection.roomId) {
// Notify room members that user left
await websocketService.broadcastToRoom({
type: 'user_left',
data: {
userId: connection.userId,
username: connection.username,
leftAt: new Date().toISOString(),
},
roomId: connection.roomId,
excludeConnectionId: connectionId,
});
}
// Remove connection from database
await connectionService.deleteConnection(connectionId);
console.log(`Connection ${connectionId} removed`);
return { statusCode: 200 };
} catch (error) {
console.error('Error handling disconnection:', error);
return { statusCode: 500 };
}
};
// src/handlers/websocket/message.ts
import { WebSocketEvent, WebSocketResponse, WebSocketMessage, WebSocketAction } from '../../types/websocket';
import { ConnectionService } from '../../services/connection-service';
import { ChatService } from '../../services/chat-service';
import { WebSocketService } from '../../services/websocket-service';
import { MessageType } from '../../types/websocket';
const connectionService = new ConnectionService(process.env.TABLE_NAME!);
const chatService = new ChatService(process.env.TABLE_NAME!);
const websocketService = new WebSocketService(
`https://${process.env.API_ID}.execute-api.${process.env.REGION}.amazonaws.com/${process.env.STAGE}`,
connectionService
);
export const handler = async (event: WebSocketEvent): Promise<WebSocketResponse> => {
console.log('WebSocket message event:', JSON.stringify(event, null, 2));
try {
const { connectionId } = event.requestContext;
if (!event.body) {
await websocketService.sendError(connectionId, 'Message body is required');
return { statusCode: 400 };
}
const message: WebSocketMessage = JSON.parse(event.body);
if (!message.action) {
await websocketService.sendError(connectionId, 'Action is required', message.requestId);
return { statusCode: 400 };
}
const connection = await connectionService.getConnection(connectionId);
if (!connection) {
await websocketService.sendError(connectionId, 'Connection not found', message.requestId);
return { statusCode: 404 };
}
await handleAction(message, connection, connectionId);
return { statusCode: 200 };
} catch (error) {
console.error('Error handling message:', error);
try {
const { connectionId } = event.requestContext;
await websocketService.sendError(connectionId, 'Internal server error');
} catch (sendError) {
console.error('Error sending error message:', sendError);
}
return { statusCode: 500 };
}
};
async function handleAction(
message: WebSocketMessage,
connection: any,
connectionId: string
): Promise<void> {
switch (message.action) {
case WebSocketAction.JOIN_ROOM:
await handleJoinRoom(message, connection, connectionId);
break;
case WebSocketAction.LEAVE_ROOM:
await handleLeaveRoom(message, connection, connectionId);
break;
case WebSocketAction.SEND_MESSAGE:
await handleSendMessage(message, connection, connectionId);
break;
case WebSocketAction.EDIT_MESSAGE:
await handleEditMessage(message, connection, connectionId); break;
case WebSocketAction.DELETE_MESSAGE:
await handleDeleteMessage(message, connection, connectionId);
break;
case WebSocketAction.LIST_ROOMS:
await handleListRooms(message, connection, connectionId);
break;
case WebSocketAction.CREATE_ROOM:
await handleCreateRoom(message, connection, connectionId);
break;
case WebSocketAction.GET_ROOM_HISTORY:
await handleGetRoomHistory(message, connection, connectionId);
break;
case WebSocketAction.TYPING_START:
case WebSocketAction.TYPING_STOP:
await handleTypingIndicator(message, connection, connectionId);
break;
default:
await websocketService.sendError(connectionId, `Unknown action: ${message.action}`, message.requestId);
}
}
async function handleJoinRoom(
message: WebSocketMessage,
connection: any,
connectionId: string
): Promise<void> {
const { roomId } = message.data;
if (!roomId) {
await websocketService.sendError(connectionId, 'Room ID is required', message.requestId);
return;
}
const room = await chatService.getRoom(roomId);
if (!room) {
await websocketService.sendError(connectionId, 'Room not found', message.requestId);
return;
}
// Check if user has permission to join room
if (room.isPrivate && !room.members.includes(connection.userId)) {
await websocketService.sendError(connectionId, 'Access denied', message.requestId);
return;
}
// Update connection with room info
await connectionService.updateConnectionRoom(connectionId, roomId);
// Notify room members that user joined
await websocketService.broadcastToRoom({
type: 'user_joined',
data: {
userId: connection.userId,
username: connection.username,
joinedAt: new Date().toISOString(),
},
roomId,
excludeConnectionId: connectionId,
});
// Send success response with room info
await websocketService.sendSuccess(connectionId, {
room,
message: 'Successfully joined room',
}, message.requestId);
}
async function handleLeaveRoom(
message: WebSocketMessage,
connection: any,
connectionId: string
): Promise<void> {
if (!connection.roomId) {
await websocketService.sendError(connectionId, 'Not currently in a room', message.requestId);
return;
}
const roomId = connection.roomId;
// Update connection to remove room
await connectionService.updateConnectionRoom(connectionId, null);
// Notify room members that user left
await websocketService.broadcastToRoom({
type: 'user_left',
data: {
userId: connection.userId,
username: connection.username,
leftAt: new Date().toISOString(),
},
roomId,
excludeConnectionId: connectionId,
});
await websocketService.sendSuccess(connectionId, {
message: 'Successfully left room',
}, message.requestId);
}
async function handleSendMessage(
message: WebSocketMessage,
connection: any,
connectionId: string
): Promise<void> {
const { content, messageType = MessageType.TEXT, replyToMessageId } = message.data;
if (!connection.roomId) {
await websocketService.sendError(connectionId, 'Must be in a room to send messages', message.requestId);
return;
}
if (!content || content.trim().length === 0) {
await websocketService.sendError(connectionId, 'Message content is required', message.requestId);
return;
}
const chatMessage = await chatService.sendMessage({
roomId: connection.roomId,
userId: connection.userId,
username: connection.username,
content: content.trim(),
messageType,
replyToMessageId,
});
// Broadcast message to all room members
await websocketService.broadcastToRoom({
type: 'message',
data: chatMessage,
roomId: connection.roomId,
});
await websocketService.sendSuccess(connectionId, {
message: 'Message sent successfully',
messageId: chatMessage.messageId,
}, message.requestId);
}
async function handleEditMessage(
message: WebSocketMessage,
connection: any,
connectionId: string
): Promise<void> {
const { messageId, content } = message.data;
if (!messageId || !content) {
await websocketService.sendError(connectionId, 'Message ID and content are required', message.requestId);
return;
}
const editedMessage = await chatService.editMessage(messageId, content.trim(), connection.userId);
if (!editedMessage) {
await websocketService.sendError(connectionId, 'Message not found or access denied', message.requestId);
return;
}
// Broadcast updated message to room members
await websocketService.broadcastToRoom({
type: 'message',
data: {
...editedMessage,
edited: true,
},
roomId: editedMessage.roomId,
});
await websocketService.sendSuccess(connectionId, {
message: 'Message edited successfully',
}, message.requestId);
}
async function handleDeleteMessage(
message: WebSocketMessage,
connection: any,
connectionId: string
): Promise<void> {
const { messageId } = message.data;
if (!messageId) {
await websocketService.sendError(connectionId, 'Message ID is required', message.requestId);
return;
}
const deleted = await chatService.deleteMessage(messageId, connection.userId);
if (!deleted) {
await websocketService.sendError(connectionId, 'Message not found or access denied', message.requestId);
return;
}
// Broadcast deletion to room members
if (connection.roomId) {
await websocketService.broadcastToRoom({
type: 'message',
data: {
messageId,
deleted: true,
deletedAt: new Date().toISOString(),
},
roomId: connection.roomId,
});
}
await websocketService.sendSuccess(connectionId, {
message: 'Message deleted successfully',
}, message.requestId);
}
async function handleListRooms(
message: WebSocketMessage,
connection: any,
connectionId: string
): Promise<void> {
const { isPrivate, limit = 50 } = message.data || {};
const rooms = await chatService.listRooms(isPrivate, limit);
await websocketService.sendSuccess(connectionId, {
rooms,
}, message.requestId);
}
async function handleCreateRoom(
message: WebSocketMessage,
connection: any,
connectionId: string
): Promise<void> {
const { name, description, isPrivate = false, members = [] } = message.data;
if (!name || name.trim().length === 0) {
await websocketService.sendError(connectionId, 'Room name is required', message.requestId);
return;
}
const room = await chatService.createRoom({
name: name.trim(),
description: description?.trim(),
createdBy: connection.userId,
isPrivate,
members,
});
await websocketService.sendSuccess(connectionId, {
room,
message: 'Room created successfully',
}, message.requestId);
}
async function handleGetRoomHistory(
message: WebSocketMessage,
connection: any,
connectionId: string
): Promise<void> {
const { roomId, limit = 50, exclusiveStartKey } = message.data;
if (!roomId) {
await websocketService.sendError(connectionId, 'Room ID is required', message.requestId);
return;
}
const room = await chatService.getRoom(roomId);
if (!room) {
await websocketService.sendError(connectionId, 'Room not found', message.requestId);
return;
}
// Check access permissions
if (room.isPrivate && !room.members.includes(connection.userId)) {
await websocketService.sendError(connectionId, 'Access denied', message.requestId);
return;
}
const history = await chatService.getRoomHistory(roomId, limit, exclusiveStartKey);
await websocketService.sendSuccess(connectionId, history, message.requestId);
}
async function handleTypingIndicator(
message: WebSocketMessage,
connection: any,
connectionId: string
): Promise<void> {
if (!connection.roomId) {
return; // Silently ignore if not in a room
}
const isTyping = message.action === WebSocketAction.TYPING_START;
await websocketService.broadcastToRoom({
type: 'typing',
data: {
userId: connection.userId,
username: connection.username,
isTyping,
timestamp: new Date().toISOString(),
},
roomId: connection.roomId,
excludeConnectionId: connectionId,
});
}
SAM Template for WebSocket API
Create the infrastructure template:
# template.yaml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Parameters:
Stage:
Type: String
Default: dev
Description: API Gateway deployment stage
Globals:
Function:
Timeout: 30
Runtime: nodejs18.x
Environment:
Variables:
TABLE_NAME: !Ref ChatTable
STAGE: !Ref Stage
Resources:
# WebSocket API
ChatWebSocketApi:
Type: AWS::ApiGatewayV2::Api
Properties:
Name: !Sub "chat-websocket-${Stage}"
ProtocolType: WEBSOCKET
RouteSelectionExpression: "$request.body.action"
# WebSocket API Stage
ChatWebSocketStage:
Type: AWS::ApiGatewayV2::Stage
Properties:
ApiId: !Ref ChatWebSocketApi
StageName: !Ref Stage
AutoDeploy: true
# DynamoDB Table
ChatTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: !Sub "chat-table-${Stage}"
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: pk
AttributeType: S
- AttributeName: sk
AttributeType: S
- AttributeName: gsi1pk
AttributeType: S
- AttributeName: gsi1sk
AttributeType: S
- AttributeName: gsi2pk
AttributeType: S
- AttributeName: gsi2sk
AttributeType: S
KeySchema:
- AttributeName: pk
KeyType: HASH
- AttributeName: sk
KeyType: RANGE
GlobalSecondaryIndexes:
- IndexName: GSI1
KeySchema:
- AttributeName: gsi1pk
KeyType: HASH
- AttributeName: gsi1sk
KeyType: RANGE
Projection:
ProjectionType: ALL
- IndexName: GSI2
KeySchema:
- AttributeName: gsi2pk
KeyType: HASH
- AttributeName: gsi2sk
KeyType: RANGE
Projection:
ProjectionType: ALL
TimeToLiveSpecification:
AttributeName: ttl
Enabled: true
# Lambda Functions
ConnectFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: dist/
Handler: handlers/websocket/connect.handler
Environment:
Variables:
API_ID: !Ref ChatWebSocketApi
REGION: !Ref AWS::Region
Policies:
- DynamoDBCrudPolicy:
TableName: !Ref ChatTable
DisconnectFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: dist/
Handler: handlers/websocket/disconnect.handler
Environment:
Variables:
API_ID: !Ref ChatWebSocketApi
REGION: !Ref AWS::Region
Policies:
- DynamoDBCrudPolicy:
TableName: !Ref ChatTable
- Statement:
- Effect: Allow
Action:
- execute-api:ManageConnections
Resource: !Sub "arn:aws:execute-api:${AWS::Region}:${AWS::AccountId}:${ChatWebSocketApi}/${Stage}/POST/@connections/*"
MessageFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: dist/
Handler: handlers/websocket/message.handler
Environment:
Variables:
API_ID: !Ref ChatWebSocketApi
REGION: !Ref AWS::Region
Policies:
- DynamoDBCrudPolicy:
TableName: !Ref ChatTable
- Statement:
- Effect: Allow
Action:
- execute-api:ManageConnections
Resource: !Sub "arn:aws:execute-api:${AWS::Region}:${AWS::AccountId}:${ChatWebSocketApi}/${Stage}/POST/@connections/*"
# WebSocket Routes
ConnectRoute:
Type: AWS::ApiGatewayV2::Route
Properties:
ApiId: !Ref ChatWebSocketApi
RouteKey: $connect
AuthorizationType: NONE
Target: !Join ['/', ['integrations', !Ref ConnectIntegration]]
DisconnectRoute:
Type: AWS::ApiGatewayV2::Route
Properties:
ApiId: !Ref ChatWebSocketApi
RouteKey: $disconnect
AuthorizationType: NONE
Target: !Join ['/', ['integrations', !Ref DisconnectIntegration]]
DefaultRoute:
Type: AWS::ApiGatewayV2::Route
Properties:
ApiId: !Ref ChatWebSocketApi
RouteKey: $default
AuthorizationType: NONE
Target: !Join ['/', ['integrations', !Ref MessageIntegration]]
# Integrations
ConnectIntegration:
Type: AWS::ApiGatewayV2::Integration
Properties:
ApiId: !Ref ChatWebSocketApi
IntegrationType: AWS_PROXY
IntegrationUri: !Sub "arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${ConnectFunction.Arn}/invocations"
DisconnectIntegration:
Type: AWS::ApiGatewayV2::Integration
Properties:
ApiId: !Ref ChatWebSocketApi
IntegrationType: AWS_PROXY
IntegrationUri: !Sub "arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${DisconnectFunction.Arn}/invocations"
MessageIntegration:
Type: AWS::ApiGatewayV2::Integration
Properties:
ApiId: !Ref ChatWebSocketApi
IntegrationType: AWS_PROXY
IntegrationUri: !Sub "arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${MessageFunction.Arn}/invocations"
# Lambda Permissions
ConnectPermission:
Type: AWS::Lambda::Permission
Properties:
Action: lambda:InvokeFunction
FunctionName: !Ref ConnectFunction
Principal: apigateway.amazonaws.com
SourceArn: !Sub "arn:aws:execute-api:${AWS::Region}:${AWS::AccountId}:${ChatWebSocketApi}/*"
DisconnectPermission:
Type: AWS::Lambda::Permission
Properties:
Action: lambda:InvokeFunction
FunctionName: !Ref DisconnectFunction
Principal: apigateway.amazonaws.com
SourceArn: !Sub "arn:aws:execute-api:${AWS::Region}:${AWS::AccountId}:${ChatWebSocketApi}/*"
MessagePermission:
Type: AWS::Lambda::Permission
Properties:
Action: lambda:InvokeFunction
FunctionName: !Ref MessageFunction
Principal: apigateway.amazonaws.com
SourceArn: !Sub "arn:aws:execute-api:${AWS::Region}:${AWS::AccountId}:${ChatWebSocketApi}/*"
Outputs:
WebSocketUrl:
Description: "WebSocket API URL"
Value: !Sub "wss://${ChatWebSocketApi}.execute-api.${AWS::Region}.amazonaws.com/${Stage}"
TableName:
Description: "DynamoDB table name"
Value: !Ref ChatTable
ApiId:
Description: "WebSocket API ID"
Value: !Ref ChatWebSocketApi
Client-Side TypeScript Integration
Create a TypeScript client for WebSocket communication:
// client/websocket-client.ts
export class ChatWebSocketClient {
private ws: WebSocket | null = null;
private url: string;
private reconnectAttempts = 0;
private maxReconnectAttempts = 5;
private reconnectDelay = 1000;
private pingInterval: number | null = null;
private messageHandlers = new Map<string, (data: any) => void>();
private requestHandlers = new Map<string, { resolve: (data: any) => void; reject: (error: any) => void }>();
constructor(url: string) {
this.url = url;
}
connect(token?: string): Promise<void> {
return new Promise((resolve, reject) => {
const wsUrl = token ? `${this.url}?authorization=${token}` : this.url;
this.ws = new WebSocket(wsUrl);
this.ws.onopen = () => {
console.log('WebSocket connected');
this.reconnectAttempts = 0;
this.startPing();
resolve();
};
this.ws.onmessage = (event) => {
this.handleMessage(JSON.parse(event.data));
};
this.ws.onclose = (event) => {
console.log('WebSocket disconnected:', event.code, event.reason);
this.stopPing();
if (!event.wasClean && this.reconnectAttempts < this.maxReconnectAttempts) {
setTimeout(() => {
this.reconnectAttempts++;
this.connect(token);
}, this.reconnectDelay * Math.pow(2, this.reconnectAttempts));
}
};
this.ws.onerror = (error) => {
console.error('WebSocket error:', error);
reject(error);
};
});
}
disconnect(): void {
this.stopPing();
if (this.ws) {
this.ws.close(1000, 'Client disconnect');
this.ws = null;
}
}
sendMessage(action: string, data: any): Promise<any> {
return new Promise((resolve, reject) => {
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
reject(new Error('WebSocket not connected'));
return;
}
const requestId = generateId();
const message = {
action,
data,
requestId,
};
this.requestHandlers.set(requestId, { resolve, reject });
this.ws.send(JSON.stringify(message));
// Timeout after 30 seconds
setTimeout(() => {
if (this.requestHandlers.has(requestId)) {
this.requestHandlers.delete(requestId);
reject(new Error('Request timeout'));
}
}, 30000);
});
}
onMessage(type: string, handler: (data: any) => void): void {
this.messageHandlers.set(type, handler);
}
offMessage(type: string): void {
this.messageHandlers.delete(type);
}
// Convenience methods
async joinRoom(roomId: string): Promise<any> {
return this.sendMessage('joinRoom', { roomId });
}
async leaveRoom(): Promise<any> {
return this.sendMessage('leaveRoom', {});
}
async sendChatMessage(content: string, replyToMessageId?: string): Promise<any> {
return this.sendMessage('sendMessage', { content, replyToMessageId });
}
async editMessage(messageId: string, content: string): Promise<any> {
return this.sendMessage('editMessage', { messageId, content });
}
async deleteMessage(messageId: string): Promise<any> {
return this.sendMessage('deleteMessage', { messageId });
}
async createRoom(name: string, description?: string, isPrivate = false, members: string[] = []): Promise<any> {
return this.sendMessage('createRoom', { name, description, isPrivate, members });
}
async listRooms(isPrivate?: boolean): Promise<any> {
return this.sendMessage('listRooms', { isPrivate });
}
async getRoomHistory(roomId: string, limit = 50): Promise<any> {
return this.sendMessage('getRoomHistory', { roomId, limit });
}
startTyping(): void {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({ action: 'typingStart', data: {} }));
}
}
stopTyping(): void {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({ action: 'typingStop', data: {} }));
}
}
private handleMessage(message: any): void {
if (message.requestId && this.requestHandlers.has(message.requestId)) {
const handler = this.requestHandlers.get(message.requestId)!;
this.requestHandlers.delete(message.requestId);
if (message.type === 'error') {
handler.reject(new Error(message.error));
} else {
handler.resolve(message.data);
}
return;
}
// Handle broadcast messages
const handler = this.messageHandlers.get(message.type);
if (handler) {
handler(message.data);
}
}
private startPing(): void {
this.pingInterval = setInterval(() => {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({ action: 'ping', data: {} }));
}
}, 30000) as any;
}
private stopPing(): void {
if (this.pingInterval) {
clearInterval(this.pingInterval);
this.pingInterval = null;
}
}
}
function generateId(): string {
return Math.random().toString(36).substring(2) + Date.now().toString(36);
}
Conclusion
Building real-time applications with AWS WebSockets and TypeScript creates powerful, scalable communication systems that can handle thousands of concurrent users. The serverless approach eliminates infrastructure management while providing automatic scaling and cost optimization through pay-per-use pricing.
The patterns demonstrated in this post—from connection management and message routing to room-based broadcasting and typing indicators—provide a solid foundation for building sophisticated real-time applications. The type-safe approach ensures reliability and maintainability as your application scales and evolves.
This concludes our comprehensive journey through AWS and TypeScript, covering Lambda functions, Step Functions, SNS/SQS messaging, API Gateway, DynamoDB, CDK infrastructure as code, and WebSocket real-time communication. Together, these technologies provide a complete toolkit for building modern, scalable serverless applications that are both powerful and maintainable.
Comments