Tag: websocket

  • Building a WebSocket Service with AWS Lambda & DynamoDB

    WebSocket is an effective way for full-duplex, real-time communication between a web server and a client. It is widely used for building real-time web applications along with helper libraries that offer better features. Implementing WebSockets requires a persistent connection between two parties. Serverless functions are known for short execution time and non-persistent behavior. However, with the API Gateway support for WebSocket endpoints, it is possible to implement a Serverless service built on AWS Lambda, API Gateway, and DynamoDB.

    Prerequisites

    A basic understanding of real-time web applications will help with this implementation. Throughout this article, we will be using Serverless Framework for developing and deploying the WebSocket service. Also, Node.js is used to write the business logic. 

    Behind the scenes, Serverless uses Cloudformation to create various required resources, like API Gateway APIs, AWS Lambda functions, IAM roles and policies, etc.

    Why Serverless?

    Serverless Framework abstracts the complex syntax needed for creating the Cloudformation stacks and helps us focus on the business logic of the services. Along with that, there are a variety of plugins available that help developing serverless applications easier.

    Why DynamoDB?

    We need persistent storage for WebSocket connection data, along with AWS Lambda. DynamoDB, a serverless key-value database from AWS, offers low latency, making it a great fit for storing and retrieving WebSocket connection details.

    Overview

    In this application, we’ll be creating an AWS Lambda service that accepts the WebSocket connections coming via API Gateway. The connections and subscriptions to topics are persisted using DynamoDB. We will be using ws for implementing basic WebSocket clients for the demonstration. The implementation has a Lambda consuming WebSocket that receives the connections and handles the communication. 

    Base Setup

    We will be using the default Node.js boilerplate offered by Serverless as a starting point.

    serverless create --template aws-nodejs

    A few of the Serverless plugins are installed and used to speed up the development and deployment of the Serverless stack. We also add the webpack config given here to support the latest JS syntax.

    Adding Lambda role and policies:

    The lambda function requires a role attached to it that has enough permissions to access DynamoDB and Execute API. These are the links for the configuration files:

    Link to dynamoDB.yaml

    Link to lambdaRole.yaml

    Adding custom config for plugins:

    The plugins used for local development must have the custom config added in the yaml file.

    This is how our serverless.yaml file should look like after the base serverless configuration:

    service: websocket-app
    frameworkVersion: '2'
    custom:
     dynamodb:
       stages:
         - dev
       start:
         port: 8000
         inMemory: true
         heapInitial: 200m
         heapMax: 1g
         migrate: true
         convertEmptyValues: true
     webpack:
       keepOutputDirectory: true
       packager: 'npm'
       includeModules:
         forceExclude:
           - aws-sdk
     
    provider:
     name: aws
     runtime: nodejs12.x
     lambdaHashingVersion: 20201221
    plugins:
     - serverless-dynamodb-local
     - serverless-plugin-existing-s3
     - serverless-dotenv-plugin
     - serverless-webpack
     - serverless-offline
    resources:
     - Resources: ${file(./config/dynamoDB.yaml)}
     - Resources: ${file(./config/lambdaRoles.yaml)}
    functions:
     hello:
       handler: handler.hello

    Add WebSocket Lambda:

    We need to create a lambda function that accepts WebSocket events from API Gateway. As you can see, we’ve defined 3 WebSocket events for the lambda function.

    • $connect
    • $disconnect
    • $default

    These 3 events stand for the default routes that come with WebSocket API Gateway offering. $connect and $disconnect are used for initialization and termination of the socket connection, where $default route is for data transfer.

    functions:
     websocket:
       handler: lambda/websocket.handler
       events:
         - websocket:
             route: $connect
         - websocket:
             route: $disconnect
         - websocket:
             route: $default

    We can go ahead and update how data is sent and add custom WebSocket routes to the application.

    The lambda needs to establish a connection with the client and handle the subscriptions. The logic for updating the DynamoDB is written in a utility class client. Whenever a connection is received, we create a record in the topics table.

    console.log(`Received socket connectionId: ${event.requestContext && event.requestContext.connectionId}`);
           if (!(event.requestContext && event.requestContext.connectionId)) {
               throw new Error('Invalid event. Missing `connectionId` parameter.');
           }
           const connectionId = event.requestContext.connectionId;
           const route = event.requestContext.routeKey;
           console.log(`data from ${connectionId} ${event.body}`);
           const connection = new Client(connectionId);
           const response = { statusCode: 200, body: '' };
     
           if (route === '$connect') {
               console.log(`Route ${route} - Socket connectionId connectedconected: ${event.requestContext && event.requestContext.connectionId}`);
               await new Client(connectionId).connect();
               return response;
           } 

    The Client utility class internally creates a record for the given connectionId in the DynamoDB topics table.

    async subscribe({ topic, ttl }) {
       return dynamoDBClient
         .put({ 
            Item: {
             topic,
             connectionId: this.connectionId,
            ttl: typeof ttl === 'number' ? ttl : Math.floor(Date.now() / 1000) + 60 * 60 * 2,
           },
           TableName: process.env.TOPICS_TABLE,
         }).promise();
     }

    Similarly, for the $disconnect route, we remove the INITIAL_CONNECTION topic record when a client disconnects.

    else if (route === '$disconnect') {
     console.log(`Route ${route} - Socket disconnected: ${ event.requestContext.connectionId}`);
               await new Client(connectionId).unsubscribe();
               return response;
           }

    The client.unsubscribe method internally removes the connection entry from the DynamoDB table. Here, the getTopics method fetches all the topics the particular client has subscribed to.

    async unsubscribe() {
       const topics = await this.getTopics();
       if (!topics) {
         throw Error(`Topics got undefined`);
       }
       return this.removeTopics({
         [process.env.TOPICS_TABLE]: topics.map(({ topic, connectionId }) => ({
           DeleteRequest: { Key: { topic, connectionId } },
         })),
       });
     }

    Now comes the default route part of the lambda where we customize message handling. In this implementation, we’re relaying our message handling based on the event.body.type, which indicates what kind of message is received from the client to server. The subscribe type here is used to subscribe to new topics. Similarly, the message type is used to receive the message from one client and then publish it to other clients who have subscribed to the same topic as the sender.

    console.log(`Route ${route} - data from ${connectionId}`);
               if (!event.body) {
                   return response;
               }
               let body = JSON.parse(event.body);
               const topic = body.topic;
               if (body.type === 'subscribe') {
                   connection.subscribe({ topic });
                   console.log(`Client subscribing for topic: ${topic}`);
               }
               if (body.type === 'message') {
                   await new Topic(topic).publishMessage({ data: body.message });
                   console.error(`Published messages to subscribers`);
                   return response;
               }
               return response;

    Similar to $connect, the subscribe type of payload, when received, creates a new subscription for the mentioned topic.

    Publishing the messages

    Here is the interesting part of this lambda. When a client sends a payload with type message, the lambda calls the publishMessage method with the data received. The method gets the subscribers active for the topic and publishes messages using another utility TopicSubscriber.sendMessage

    async publishMessage(data) {
       const subscribers = await this.getSubscribers();
       const promises = subscribers.map(async ({ connectionId, subscriptionId }) => {
         const TopicSubscriber = new Client(connectionId);
           const res = await TopicSubscriber.sendMessage({
             id: subscriptionId,
             payload: { data },
             type: 'data',
           });
           return res;
       });
       return Promise.all(promises);
     }

    The sendMessage executes the API endpoint, which is the API Gateway URL after deployment. As we’re using serverless-offline for the local development, the IS_OFFLINE env variable is automatically set.

    const endpoint =  process.env.IS_OFFLINE ? 'http://localhost:3001' : process.env.PUBLISH_ENDPOINT;
       console.log('publish endpoint', endpoint);
       const gatewayClient = new ApiGatewayManagementApi({
         apiVersion: '2018-11-29',
         credentials: config,
         endpoint,
       });
       return gatewayClient
         .postToConnection({
           ConnectionId: this.connectionId,
           Data: JSON.stringify(message),
         })
         .promise();

    Instead of manually invoking the API endpoint, we can also use DynamoDB streams to trigger a lambda asynchronously and publish messages to topics.

    Implementing the client

    For testing the socket implementation, we will be using a node.js script ws-client.js. This creates two nodejs ws clients: one that sends the data and another that receives it.

    const WebSocket = require('ws');
    const sockedEndpoint = 'http://0.0.0.0:3001';
    const ws1 = new WebSocket(sockedEndpoint, {
     perMessageDeflate: false
    });
    const ws2 = new WebSocket(sockedEndpoint, {
     perMessageDeflate: false
    });

    The first client on connect sends the data at an interval of one second to a topic named general. The count increments each send.

    ws1.on('open', () => {
       console.log('WS1 connected');
       let count = 0;
       setInterval(() => {
         const data = {
           type: 'message',
           message: `count is ${count}`,
           topic: 'general'
         }
         const message  = JSON.stringify(data);
         ws1.send(message, (err) => {
           if(err) {
             console.log(`Error occurred while send data ${err.message}`)
           }
           console.log(`WS1 OUT ${message}`);
         })
         count++;
       }, 15000)
    })

    The second client on connect will first subscribe to the general topic and then attach a handler for receiving data.

    ws2.on('open', () => {
     console.log('WS2 connected');
     const data = {
       type: 'subscribe',
       topic: 'general'
     }
     ws2.send(JSON.stringify(data), (err) => {
       if(err) {
         console.log(`Error occurred while send data ${err.message}`)
       }
     })
    });
    ws2.on('message', ( message) => {
     console.log(`ws2 IN ${message}`);
    });

    Once the service is running, we should be able to see the following output, where the two clients successfully sharing and receiving the messages with our socket server.

    Conclusion

    With API Gateway WebSocket support and DynamoDB, we’re able to implement persistent socket connections using serverless functions. The implementation can be improved and can be as complex as needed.

    WebSocket is an effective way for full-duplex, real-time communication between a web server and a client. It is widely used for building real-time web applications along with helper libraries that offer better features. Implementing WebSockets requires a persistent connection between two parties. Serverless functions are known for short execution time and non-persistent behavior. However, with the API Gateway support for WebSocket endpoints, it is possible to implement a Serverless service built on AWS Lambda, API Gateway, and DynamoDB.

  • A Beginner’s Guide to Python Tornado

    The web is a big place now. We need to support thousands of clients at a time, and here comes Tornado. Tornado is a Python web framework and asynchronous network library, originally developed at FriendFreed.

    Tornado uses non-blocking network-io. Due to this, it can handle thousands of active server connections. It is a saviour for applications where long polling and a large number of active connections are maintained.

    Tornado is not like most Python frameworks. It’s not based on WSGI, while it supports some features of WSGI using module `tornado.wsgi`. It uses an event loop design that makes Tornado request execution faster.  

    What is Synchronous Program?

    A function blocks, performs its computation, and returns, once done . A function may block for many reasons: network I/O, disk I/O, mutexes, etc.

    Application performance depends on how efficiently application uses CPU cycles, that’s why blocking statements/calls must be taken seriously. Consider password hashing functions like bcrypt, which by design use hundreds of milliseconds of CPU time, far more than a typical network or disk access. As the CPU is not idle, there is no need to go for asynchronous functions.

    A function can be blocking in one, and non-blocking in others. In the context of Tornado, we generally consider blocking due to network I/O and disk, although all kinds of blocking need to be minimized.

    What is Asynchronous Program?

    1) Single-threaded architecture:

        Means, it can’t do computation-centric tasks parallely.

    2) I/O concurrency:

        It can handover IO tasks to the operating system and continue to the next task to achieve parallelism.

    3) epoll/ kqueue:

        Underline system-related construct that allows an application to get events on a file descriptor or I/O specific tasks.

    4) Event loop:

        It uses epoll or kqueue to check if any event has happened, and executes callback that is waiting for those network events.

    Asynchronous vs Synchronous Web Framework:

    In case of synchronous model, each request or task is transferred to thread or routing, and as it finishes, the result is handed over to the caller. Here, managing things are easy, but creating new threads is too much overhead.

    On the other hand, in Asynchronous framework, like Node.js, there is a single-threaded model, so very less overhead, but it has complexity.

    Let’s imagine thousands of requests coming through and a server uses event loop and callback. Now, until request gets processed, it has to efficiently store and manage the state of that request to map callback result to the actual client.

    Node.js vs Tornado

    Most of these comparison points are tied to actual programming language and not the framework: 

    • Node.js has one big advantage that all of its libraries are Async. In Python, there are lots of available packages, but very few of them are asynchronous
    • As Node.js is JavaScript runtime, and we can use JS for both front and back-end, developers can keep only one codebase and share the same utility library
    • Google’s V8 engine makes Node.js faster than Tornado. But a lot of Python libraries are written in C and can be faster alternatives.

    A Simple ‘Hello World’ Example

    import tornado.ioloop
    import tornado.web
    
    class MainHandler(tornado.web.RequestHandler):
        def get(self):
            self.write("Hello, world")
    
    def make_app():
        return tornado.web.Application([
            (r"/", MainHandler),
        ])
    
    if __name__ == "__main__":
        app = make_app()
        app.listen(8888)
        tornado.ioloop.IOLoop.current().start()

    Note: This example does not use any asynchronous feature.

    Using AsyncHTTPClient module, we can do REST call asynchronously.

    from tornado.httpclient import AsyncHTTPClient
    from tornado import gen
    
    @gen.coroutine
    def async_fetch_gen(url):
        http_client = AsyncHTTPClient()
        response = yield http_client.fetch(url)
        raise gen.Return(response.body)

    As you can see `yield http_client.fetch(url)` will run as a coroutine.

    Complex Example of Tornado Async

    Please have a look at Asynchronous Request handler.

    WebSockets Using Tornado:

    Tornado has built-in package for WebSockets that can be easily used with coroutines to achieve concurrency, here is one example:

    import logging
    import tornado.escape
    import tornado.ioloop
    import tornado.options
    import tornado.web
    import tornado.websocket
    from tornado.options import define, options
    from tornado.httpserver import HTTPServer
    
    define("port", default=8888, help="run on the given port", type=int)
    
    
    # queue_size = 1
    # producer_num_items = 5
    # q = queues.Queue(queue_size)
    
    def isPrime(num):
        """
        Simple worker but mostly IO/network call
        """
        if num > 1:
            for i in range(2, num // 2):
                if (num % i) == 0:
                    return ("is not a prime number")
            else:
                return("is a prime number")
        else:
            return ("is not a prime number")
    
    class Application(tornado.web.Application):
        def __init__(self):
            handlers = [(r"/chatsocket", TornadoWebSocket)]
            super(Application, self).__init__(handlers)
    
    class TornadoWebSocket(tornado.websocket.WebSocketHandler):
        clients = set()
    
        # enable cross domain origin
        def check_origin(self, origin):
            return True
    
        def open(self):
            TornadoWebSocket.clients.add(self)
    
        # when client closes connection
        def on_close(self):
            TornadoWebSocket.clients.remove(self)
    
        @classmethod
        def send_updates(cls, producer, result):
    
            for client in cls.clients:
    
                # check if result is mapped to correct sender
                if client == producer:
                    try:
                        client.write_message(result)
                    except:
                        logging.error("Error sending message", exc_info=True)
    
        def on_message(self, message):
            try:
                num = int(message)
            except ValueError:
                TornadoWebSocket.send_updates(self, "Invalid input")
                return
            TornadoWebSocket.send_updates(self, isPrime(num))
    
    def start_websockets():
        tornado.options.parse_command_line()
        app = Application()
        server = HTTPServer(app)
        server.listen(options.port)
        tornado.ioloop.IOLoop.current().start()
    
    
    
    if __name__ == "__main__":
        start_websockets()

    One can use a WebSocket client application to connect to the server, message can be any integer. After processing, the client receives the result if the integer is prime or not.  
    Here is one more example of actual async features of Tornado. Many will find it similar to Golang’s Goroutine and channels.

    In this example, we can start worker(s) and they will listen to the ‘tornado.queue‘. This queue is asynchronous and very similar to the asyncio package.

    # Example 1
    from tornado import gen, queues
    from tornado.ioloop import IOLoop
    
    @gen.coroutine
    def consumer(queue, num_expected):
        for _ in range(num_expected):
            # heavy I/O or network task
            print('got: %s' % (yield queue.get()))
    
    
    @gen.coroutine
    def producer(queue, num_items):
        for i in range(num_items):
            print('putting %s' % i)
            yield queue.put(i)
    
    @gen.coroutine
    def main():
        """
        Starts producer and consumer and wait till they finish
        """
        yield [producer(q, producer_num_items), consumer(q, producer_num_items)]
    
    queue_size = 1
    producer_num_items = 5
    q = queues.Queue(queue_size)
    
    results = IOLoop.current().run_sync(main)
    
    
    # Output:
    # putting 0
    # putting 1
    # got: 0
    # got: 1
    # putting 2
    # putting 3
    # putting 4
    # got: 2
    # got: 3
    # got: 4
    
    
    # Example 2
    # Condition
    # A condition allows one or more coroutines to wait until notified.
    from tornado import gen
    from tornado.ioloop import IOLoop
    from tornado.locks import Condition
    
    my_condition = Condition()
    
    @gen.coroutine
    def waiter():
        print("I'll wait right here")
        yield my_condition.wait()
        print("Received notification now doing my things")
    
    @gen.coroutine
    def notifier():
        yield gen.sleep(60)
        print("About to notify")
        my_condition.notify()
        print("Done notifying")
    
    @gen.coroutine
    def runner():
        # Wait for waiter() and notifier() in parallel
        yield([waiter(), notifier()])
    
    results = IOLoop.current().run_sync(runner)
    
    
    # output:
    
    # I'll wait right here
    # About to notify
    # Done notifying
    # Received notification now doing my things

    Conclusion

    1) Asynchronous frameworks are not much of use when most of the computations are CPU centric and not I/O.

    2) Due to a single thread per core model and event loop, it can manage thousands of active client connections.

    3) Many say Django is too big, Flask is too small, and Tornado is just right:)

  • Scalable Real-time Communication With Pusher

    What and why?

    Pusher is a hosted API service which makes adding real-time data and functionality to web and mobile applications seamless. 

    Pusher works as a real-time communication layer between the server and the client. It maintains persistent connections at the client using WebSockets, as and when new data is added to your server. If a server wants to push new data to clients, they can do it instantly using Pusher. It is highly flexible, scalable, and easy to integrate. Pusher has exposed over 40+ SDKs that support almost all tech stacks.

    In the context of delivering real-time data, there are other hosted and self-hosted services available. It depends on the use case of what exactly one needs, like if you need to broadcast data across all the users or something more complex having specific target groups. In our use case, Pusher was well-suited, as the decision was based on the easy usage, scalability, private and public channels, webhooks, and event-based automation. Other options which we considered were Socket.IO, Firebase & Ably, etc. 

    Pusher is categorically well-suited for communication and collaboration features using WebSockets. The key difference with  Pusher: it’s a hosted service/API.  It takes less work to get started, compared to others, where you need to manage the deployment yourself. Once we do the setup, it comes to scaling, that reduces future efforts/work.

    Some of the most common use cases of Pusher are:

    1. Notification: Pusher can inform users if there is any relevant change.  Notifications can also be thought of as a form of signaling, where there is no representation of the notification in the UI. Still, it triggers a reaction within an application.

    2. Activity streams: Stream of activities which are published when something changes on the server or someone publishes it across all channels.

    3. Live Data Visualizations: Pusher allows you to broadcast continuously changing data when needed.

    4. Chats: You can use Pusher for peer to peer or peer to multichannel communication.

    In this blog, we will be focusing on using Channels, which is an alias for Pub/Sub messaging API for a JavaScript-based application. Pusher also comes with Chatkit and Beams (Push Notification) SDK/APIs.

    • Chatkit is designed to make chat integration to your app as simple as possible. It allows you to add group chat and 1 to 1 chat feature to your app. It also allows you to add file attachments and online indicators.
    • Beams are used for adding Push Notification in your Mobile App. It includes SDKs to seamlessly manage push token and send notifications.

    Step 1: Getting Started

    Setup your account on the Pusher dashboard and get your free API keys.

    Image Source: Pusher

    1. Click on Channels
    2. Create an App. Add details based on the project and the environment
    3. Click on the App Keys tab to get the app keys.
    4. You can also check the getting started page. It will give code snippets to get you started.

    Add Pusher to your project:

    var express = require('express');
    var bodyParser = require('body-parser');
    
    var app = express();
    app.use(bodyParser.json());
    app.use(bodyParser.urlencoded({ extended: false }));
    
    app.post('/pusher/auth', function(req, res) {
      var socketId = req.body.socket_id;
      var channel = req.body.channel_name;
      var auth = pusher.authenticate(socketId, channel);
      res.send(auth);
    });
    
    var port = process.env.PORT || 5000;
    app.listen(port);

    CODE: https://gist.github.com/velotiotech/f09f14363bacd51446d5318e5050d628.js

    or using npm

    npm i pusher

    CODE: https://gist.github.com/velotiotech/423115d0943c1b882c913e437c529d11.js

    Step 2: Subscribing to Channels

    There are three types of channels in Pusher: Public, Private, and Presence.

    • Public channels: These channels are public in nature, so anyone who knows the channel name can subscribe to the channel and start receiving messages from the channel. Public channels are commonly used to broadcast general/public information, which does not contain any secure information or user-specific data.
    • Private channels: These channels have an access control mechanism that allows the server to control who can subscribe to the channel and receive data from the channel. All private channels should have a private- prefixed to the name. They are commonly used when the sever needs to know who can subscribe to the channel and validate the subscribers.
    • Presence channels: It is an extension to the private channel. In addition to the properties which private channels have, it lets the server ‘register’ users information on subscription to the channel. It also enables other members to identify who is online.

    In your application, you can create a subscription and start listening to events on: 

    // Here my-channel is the channel name
    // all the event published to this channel would be available
    // once you subscribe to the channel and start listing to it.
    
    var channel = pusher.subscribe('my-channel');
    
    channel.bind('my-event', function(data) {
      alert('An event was triggered with message: ' + data.message);
    });

    CODE: https://gist.github.com/velotiotech/d8c27960e2fac408a8db57b92f1e846d.js

    Step 3: Creating Channels

    For creating channels, you can use the dashboard or integrate it with your server. For more details on how to integrate Pusher with your server, you can read (Server API). You need to create an app on your Pusher dashboard and can use it to further trigger events to your app.

    or 

    Integrate Pusher with your server. Here is a sample snippet from our node App:

    var Pusher = require('pusher');
    
    var pusher = new Pusher({
      appId: 'APP_ID',
      key: 'APP_KEY',
      secret: 'APP_SECRET',
      cluster: 'APP_CLUSTER'
    });
    
    // Logic which will then trigger events to a channel
    function trigger(){
    ...
    ...
    pusher.trigger('my-channel', 'my-event', {"message": "hello world"});
    ...
    ...
    }

    CODE: https://gist.github.com/velotiotech/6f5b0f6407c0a74a0bce4b398a849410.js

    Step 4: Adding Security

    As a default behavior, anyone who knows your public app key can open a connection to your channels app. This behavior does not add any security risk, as connections can only access data on channels. 

    For more advanced use cases, you need to use the “Authorized Connections” feature. It authorizes every single connection to your channels, and hence, avoids unwanted/unauthorized connection. To enable the authorization, set up an auth endpoint, then modify your client code to look like this.

    const channels = new Pusher(APP_KEY, {
      cluster: APP_CLUSTER,
      authEndpoint: '/your_auth_endpoint'
    });
    
    const channel = channels.subscribe('private-<channel-name>');

    CODE: https://gist.github.com/velotiotech/9369051e5661a95352f08b1fdd8bf9ed.js

    For more details on how to create an auth endpoint for your server, read this. Here is a snippet from Node.js app

    var express = require('express');
    var bodyParser = require('body-parser');
    
    var app = express();
    app.use(bodyParser.json());
    app.use(bodyParser.urlencoded({ extended: false }));
    
    app.post('/pusher/auth', function(req, res) {
      var socketId = req.body.socket_id;
      var channel = req.body.channel_name;
      var auth = pusher.authenticate(socketId, channel);
      res.send(auth);
    });
    
    var port = process.env.PORT || 5000;
    app.listen(port);

    CODE: https://gist.github.com/velotiotech/fb67d5efe3029174abc6991089a910e1.js

    Step 5: Scale as you grow

     

    Pusher comes with a wide range of plans which you can subscribe to based on your usage. You can scale your application as it grows. Here is a snippet from available plans for mode details you can refer this.

    Image Source: Pusher

    Conclusion

    This article has covered a brief description of Pusher, its use cases, and how you can use it to build a scalable real-time application. Using Pusher may vary based on different use cases; it is no real debate on what one can choose. Pusher approach is simple and API based. It enables developers to add real-time functionality to any application in very little time.

    If you want to get hands-on tutorials/blogs, please visit here.