Websocket connection with client app and Lambda backend¶
The client can be a mobile app with a push mechanism to get data from backend feed to the app, or a chat bot to maintain a connection with the bot backend and the client app.
Architecture¶
The architecture uses a set of Lambda and API Gateway with SebScoket protocol. The SAM extract looks like
Websocket:
Type: AWS::ApiGatewayV2::Api
Properties:
Name: SessionManagementWebsocketApi
ProtocolType: WEBSOCKET
RouteSelectionExpression: "$request.body.action"
The client app, creates a unique id, and establishes a connection to the API gateway with $connect
which routes to a Lambda, as declared in this SAM section:
ConnectRoute:
Type: AWS::ApiGatewayV2::Route
Properties:
ApiId: !Ref Websocket
RouteKey: $connect
AuthorizationType: NONE
OperationName: ConnectRoute
Target: !Join [ '/', ['integrations', !Ref ConnectInteg] ]
ConnectInteg:
Type: AWS::ApiGatewayV2::Integration
Properties:
ApiId: !Ref Websocket
Description: Connect Integration
IntegrationType: AWS_PROXY
IntegrationUri: !Sub arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${OnConnectFunction.Arn}/invocations
The unique ID is kept in the session browser data store.
The unique ID is added to the Sec-WebSocket-Protocol header. Another way to pass the unique ID to the backend could be a query string parameter. Keep the Sec-WebSocket-Protocol in the backend response:
{
'statusCode': 200,
'headers': {
'Sec-WebSocket-Protocol': userId
}
}
If using authentication with Oauth service, the ID comes from a JWT token or other form of authentication and uses it instead of a randomly generated unique ID.
In API Gateway, the WebSocket APIs are routes integrated with HTTP backend, Lambda or other supporting services. The connection is bi-directional. The WebSocket API generates the connection ID automatically.
Client app sends a WebSocket upgrade request. If the request succeeds, the $connect
route is executed while the connection is being established. Authorization should be done in the $connect. Use $connect when we need to be aware of the client app, and be able to send messages from backend. When we need to maintain connection information with the different connected client apps, then a connection ID is used and an external database need to keep information of the connection ID, user id, and may the state of the communication. The primary key is the user id, this helps locate users faster as they reconnect.
ddb.put_item(
TableName=table_name,
Item={
'userId': {'S': userId},
'connectionId': {'S': connection_id},
'domainName': {'S': domain_name},
'stage': {'S': stage},
'lastSeen' : {'N': last_seen},
'active': {'S': True}
}
)
Using the put_item
function, we don’t need to query the DB if the user already exists. If it is a new user, Put creates a new item. Use a global secondary index in DynamoDB to locate the connection ID, to mark the connection inactive when WebSocket API calls OnDisconnect
.
See the SAM template declaration for the DynamoDB tables, keys and global indexes
ConnectionsTable:
Type: AWS::DynamoDB::Table
Properties:
AttributeDefinitions:
- AttributeName: "userId"
AttributeType: "S"
- AttributeName: "connectionId"
AttributeType: "S"
- AttributeName: "active"
AttributeType: "S"
- AttributeName: "lastSeen"
AttributeType: "N"
KeySchema:
- AttributeName: "userId"
KeyType: "HASH"
GlobalSecondaryIndexes:
- IndexName: connectionId-index
Projection:
ProjectionType: ALL
ProvisionedThroughput:
WriteCapacityUnits: 5
ReadCapacityUnits: 5
KeySchema:
- KeyType: HASH
AttributeName: connectionId
- IndexName: lastSeen-index
Projection:
ProjectionType: KEYS_ONLY
ProvisionedThroughput:
WriteCapacityUnits: 5
ReadCapacityUnits: 5
KeySchema:
- KeyType: HASH
AttributeName: active
- KeyType: RANGE
AttributeName: lastSeen
ProvisionedThroughput:
ReadCapacityUnits: 5
WriteCapacityUnits: 5
SSESpecification:
SSEEnabled: True
SessionsTable:
Type: AWS::DynamoDB::Table
Properties:
AttributeDefinitions:
- AttributeName: "userId"
AttributeType: "S"
KeySchema:
- AttributeName: "userId"
KeyType: "HASH"
ProvisionedThroughput:
ReadCapacityUnits: 5
WriteCapacityUnits: 5
SSESpecification:
SSEEnabled: True
Once the record in a table, the application has connection management, and can retrieve session data. The business function, is specific to each business application, but will need to search for the connection ID, using the DynamoDB index and get the state of the connection.
def get_user_id(connection_id):
response = ddb.query(
TableName=connections_table_name,
IndexName='connectionId-index',
KeyConditionExpression='connectionId = :c',
ExpressionAttributeValues={
':c': {'S': connection_id}
}
)
items = response['Items']
if len(items) == 1:
return items[0]['userId']['S']
return None
Another table could be added to maintain the state of the session. For example if the source of the messages to send back to a client comes from Kafka, it can be the last commited offset, or if messages are coming from a FIFO SQS Queue, it can be a time stamp of the last pushed message, or the last message itself. The code to push to the websocket looks like this:
try:
api_client.post_to_connection(
ConnectionId=connection_id,
Data=bytes(message_to_send, 'utf-8')
)
except api_client.exceptions.GoneException as e:
print(f"Found stale connection, persisting state")
store_cursor_position(user_id, position_in_data_source)
return {
'statusCode': 410
}
In chat bot a snapshot of the conversation needs to be set up and persisted.
The $disconnect
route is executed after the connection is closed. The active attribute in the connection table is set to false. Session record may be kept.
The delete function is run every x minutes and look at potential non-active connection or connection too old (user did not reconnect). Or use the onDelete WebSocket verb to route to a lambda doing the cleaning in connection and session tables based on the unique ID.
When using DynamoDB, it provides a built-in mechanism for expiring items called Time to Live (TTL)