Tag: async

  • Node.js – Async Your Way out of Callback Hell with Promises, Async & Async/Await

    In this blog, I will compare various methods to avoid the dreaded callback hells that are common in Node.js. What exactly am I talking about? Have a look at this piece of code below. Every child function executes only when the result of its parent function is available. Callbacks are the very essence of the unblocking (and hence performant) nature of Node.js.

    foo(arg, (err, val) => {
         if (err) {
              console.log(err);
         } else {
              val += 1;
              bar(val, (err1, val1) => {
                   if (err) {
                        console.log(err1);
                   } else {
                        val1 += 2;
                        baz(val1, (err2, result) => {
                             if (err2) {
                                  console.log(err2);
                             } else {
                                  result += 3;
                                  console.log(result); // 6
                             }
                        });
                   }
              });
         }
    });

    Convinced yet? Even though there is some seemingly unnecessary error handling done here, I assume you get the drift! The problem with such code is more than just indentation. Instead, our programs entire flow is based on side effects – one function only incidentally calling the inner function.

    There are multiple ways in which we can avoid writing such deeply nested code. Let’s have a look at our options:

    Promises

    According to the official specification, promise represents an eventual result of an asynchronous operation. Basically, it represents an operation that has not completed yet but is expected to in the future. The then method is a major component of a promise. It is used to get the return value (fulfilled or rejected) of a promise. Only one of these two values will ever be set. Let’s have a look at a simple file read example without using promises:

    fs.readFile(filePath, (err, result) => {
         if (err) { console.log(err); }
         console.log(data);
    });

    Now, if readFile function returned a promise, the same logic could be written like so:

    var fileReadPromise = fs.readFile(filePath);
    fileReadPromise.then(console.log, console.error)

    The fileReadPromise can then be passed around multiple times in a code where you need to read a file. This helps in writing robust unit tests for your code since you now only have to write a single test for a promise. And more readable code!

    Chaining using promises

    The then function itself returns a promise which can again be used to do the next operation. Changing the first code snippet to using promises results in this:

    foo(arg, (err, val) => {
         if (err) {
              console.log(err);
         } else {
              val += 1;
              bar(val, (err1, val1) => {
                   if (err) {
                        console.log(err1);
                   } else {
                        val1 += 2;
                        baz(val1, (err2, result) => {
                             if (err2) {
                                  console.log(err2);
                             } else {
                                  result += 3;
                                  console.log(result); // 6
                             }
                        });
                   }
              });
         }
    });

    As in evident, it makes the code more composed, readable and easier to maintain. Also, instead of chaining we could have used Promise.all. Promise.all takes an array of promises as input and returns a single promise that resolves when all the promises supplied in the array are resolved. Other useful information on promises can be found here.

    The async utility module

    Async is an utility module which provides a set of over 70 functions that can be used to elegantly solve the problem of callback hells. All these functions follow the Node.js convention of error-first callbacks which means that the first callback argument is assumed to be an error (null in case of success). Let’s try to solve the same foo-bar-baz problem using async module. Here is the code snippet:

    function foo(arg, callback) {
      if (arg < 0) {
        callback('error');
        return;
      }
      callback(null, arg+1);
    }
    
    function bar(arg, callback) {
      if (arg < 0) {
        callback('error');
        return;
      }
      callback(null, arg+2);
    }
    
    function baz(arg, callback) {
      if (arg < 0) {
        callback('error');
        return;
      }
      callback(null, arg+3);
    }
    
    async.waterfall([
      (cb) => {
        foo(0, cb);
      },
      (arg, cb) => {
        bar(arg, cb);
      },
      (arg, cb) => {
        baz(arg, cb);
      }
    ], (err, result) => {
      if (err) {
        console.log(err);
      } else {
        console.log(result); //6
      }
    });

    Here, I have used the async.waterfall function as an example. There are a multiple functions available according to the nature of the problem you are trying to solve like async.each – for parallel execution, async.eachSeries – for serial execution etc.

    Async/Await

    Now, this is one of the most exciting features coming to Javascript in near future. It internally uses promises but handles them in a more intuitive manner. Even though it seems like promises and/or 3rd party modules like async would solve most of the problems, a further simplification is always welcome! For those of you who have worked with C# async/await, this concept is directly cribbed from there and being brought into ES7. 

    Async/await enables us to write asynchronous promise-based code as if it were synchronous, but without blocking the main thread. An async function always returns a promise whether await is used or not. But whenever an await is observed, the function is paused until the promise either resolves or rejects. Following code snippet should make it clearer:

    async function asyncFun() {
      try {
        const result = await promise;
      } catch(error) {
        console.log(error);
      }
    }

    Here,  asyncFun is an async function which captures the promised result using await. This has made the code readable and a major convenience for developers who are more comfortable with linearly executed languages, without blocking the main thread. 

    Now, like before, lets solve the foo-bar-baz problem using async/await. Note that foo, bar and baz individually return promises just like before. But instead of chaining, we have written the code linearly.

    async fooBarBaz(arg) {
      try {
      const fooResponse = await foo(arg+1);
      const barResponse = await bar(arg+2);
      const bazResponse = await baz(arg+3);
    
      return bazResponse;
      } catch (error) {
        return Error(error);
      }
    }

    How long should you (a)wait for async to come to fore?

    Well, it’s already here in the Chrome 55 release and the latest update of the V8 engine.  The native support in the language means that we should see a much more widespread use of this feature. The only, catch is that if you would want to use async/await on a codebase which isn’t promise aware and based completely on callbacks, it probably will require a lot of wrapping the existing functions to make them usable.

    To wrap up, async/await definitely make coding numerous async operations an easier job. Although promises and callbacks would do the job for most, async/await looks like the way to make some architectural problems go away and improve code quality. 

  • A Beginner’s Guide to Python Tornado

    The web is a big place now. We need to support thousands of clients at a time, and here comes Tornado. Tornado is a Python web framework and asynchronous network library, originally developed at FriendFreed.

    Tornado uses non-blocking network-io. Due to this, it can handle thousands of active server connections. It is a saviour for applications where long polling and a large number of active connections are maintained.

    Tornado is not like most Python frameworks. It’s not based on WSGI, while it supports some features of WSGI using module `tornado.wsgi`. It uses an event loop design that makes Tornado request execution faster.  

    What is Synchronous Program?

    A function blocks, performs its computation, and returns, once done . A function may block for many reasons: network I/O, disk I/O, mutexes, etc.

    Application performance depends on how efficiently application uses CPU cycles, that’s why blocking statements/calls must be taken seriously. Consider password hashing functions like bcrypt, which by design use hundreds of milliseconds of CPU time, far more than a typical network or disk access. As the CPU is not idle, there is no need to go for asynchronous functions.

    A function can be blocking in one, and non-blocking in others. In the context of Tornado, we generally consider blocking due to network I/O and disk, although all kinds of blocking need to be minimized.

    What is Asynchronous Program?

    1) Single-threaded architecture:

        Means, it can’t do computation-centric tasks parallely.

    2) I/O concurrency:

        It can handover IO tasks to the operating system and continue to the next task to achieve parallelism.

    3) epoll/ kqueue:

        Underline system-related construct that allows an application to get events on a file descriptor or I/O specific tasks.

    4) Event loop:

        It uses epoll or kqueue to check if any event has happened, and executes callback that is waiting for those network events.

    Asynchronous vs Synchronous Web Framework:

    In case of synchronous model, each request or task is transferred to thread or routing, and as it finishes, the result is handed over to the caller. Here, managing things are easy, but creating new threads is too much overhead.

    On the other hand, in Asynchronous framework, like Node.js, there is a single-threaded model, so very less overhead, but it has complexity.

    Let’s imagine thousands of requests coming through and a server uses event loop and callback. Now, until request gets processed, it has to efficiently store and manage the state of that request to map callback result to the actual client.

    Node.js vs Tornado

    Most of these comparison points are tied to actual programming language and not the framework: 

    • Node.js has one big advantage that all of its libraries are Async. In Python, there are lots of available packages, but very few of them are asynchronous
    • As Node.js is JavaScript runtime, and we can use JS for both front and back-end, developers can keep only one codebase and share the same utility library
    • Google’s V8 engine makes Node.js faster than Tornado. But a lot of Python libraries are written in C and can be faster alternatives.

    A Simple ‘Hello World’ Example

    import tornado.ioloop
    import tornado.web
    
    class MainHandler(tornado.web.RequestHandler):
        def get(self):
            self.write("Hello, world")
    
    def make_app():
        return tornado.web.Application([
            (r"/", MainHandler),
        ])
    
    if __name__ == "__main__":
        app = make_app()
        app.listen(8888)
        tornado.ioloop.IOLoop.current().start()

    Note: This example does not use any asynchronous feature.

    Using AsyncHTTPClient module, we can do REST call asynchronously.

    from tornado.httpclient import AsyncHTTPClient
    from tornado import gen
    
    @gen.coroutine
    def async_fetch_gen(url):
        http_client = AsyncHTTPClient()
        response = yield http_client.fetch(url)
        raise gen.Return(response.body)

    As you can see `yield http_client.fetch(url)` will run as a coroutine.

    Complex Example of Tornado Async

    Please have a look at Asynchronous Request handler.

    WebSockets Using Tornado:

    Tornado has built-in package for WebSockets that can be easily used with coroutines to achieve concurrency, here is one example:

    import logging
    import tornado.escape
    import tornado.ioloop
    import tornado.options
    import tornado.web
    import tornado.websocket
    from tornado.options import define, options
    from tornado.httpserver import HTTPServer
    
    define("port", default=8888, help="run on the given port", type=int)
    
    
    # queue_size = 1
    # producer_num_items = 5
    # q = queues.Queue(queue_size)
    
    def isPrime(num):
        """
        Simple worker but mostly IO/network call
        """
        if num > 1:
            for i in range(2, num // 2):
                if (num % i) == 0:
                    return ("is not a prime number")
            else:
                return("is a prime number")
        else:
            return ("is not a prime number")
    
    class Application(tornado.web.Application):
        def __init__(self):
            handlers = [(r"/chatsocket", TornadoWebSocket)]
            super(Application, self).__init__(handlers)
    
    class TornadoWebSocket(tornado.websocket.WebSocketHandler):
        clients = set()
    
        # enable cross domain origin
        def check_origin(self, origin):
            return True
    
        def open(self):
            TornadoWebSocket.clients.add(self)
    
        # when client closes connection
        def on_close(self):
            TornadoWebSocket.clients.remove(self)
    
        @classmethod
        def send_updates(cls, producer, result):
    
            for client in cls.clients:
    
                # check if result is mapped to correct sender
                if client == producer:
                    try:
                        client.write_message(result)
                    except:
                        logging.error("Error sending message", exc_info=True)
    
        def on_message(self, message):
            try:
                num = int(message)
            except ValueError:
                TornadoWebSocket.send_updates(self, "Invalid input")
                return
            TornadoWebSocket.send_updates(self, isPrime(num))
    
    def start_websockets():
        tornado.options.parse_command_line()
        app = Application()
        server = HTTPServer(app)
        server.listen(options.port)
        tornado.ioloop.IOLoop.current().start()
    
    
    
    if __name__ == "__main__":
        start_websockets()

    One can use a WebSocket client application to connect to the server, message can be any integer. After processing, the client receives the result if the integer is prime or not.  
    Here is one more example of actual async features of Tornado. Many will find it similar to Golang’s Goroutine and channels.

    In this example, we can start worker(s) and they will listen to the ‘tornado.queue‘. This queue is asynchronous and very similar to the asyncio package.

    # Example 1
    from tornado import gen, queues
    from tornado.ioloop import IOLoop
    
    @gen.coroutine
    def consumer(queue, num_expected):
        for _ in range(num_expected):
            # heavy I/O or network task
            print('got: %s' % (yield queue.get()))
    
    
    @gen.coroutine
    def producer(queue, num_items):
        for i in range(num_items):
            print('putting %s' % i)
            yield queue.put(i)
    
    @gen.coroutine
    def main():
        """
        Starts producer and consumer and wait till they finish
        """
        yield [producer(q, producer_num_items), consumer(q, producer_num_items)]
    
    queue_size = 1
    producer_num_items = 5
    q = queues.Queue(queue_size)
    
    results = IOLoop.current().run_sync(main)
    
    
    # Output:
    # putting 0
    # putting 1
    # got: 0
    # got: 1
    # putting 2
    # putting 3
    # putting 4
    # got: 2
    # got: 3
    # got: 4
    
    
    # Example 2
    # Condition
    # A condition allows one or more coroutines to wait until notified.
    from tornado import gen
    from tornado.ioloop import IOLoop
    from tornado.locks import Condition
    
    my_condition = Condition()
    
    @gen.coroutine
    def waiter():
        print("I'll wait right here")
        yield my_condition.wait()
        print("Received notification now doing my things")
    
    @gen.coroutine
    def notifier():
        yield gen.sleep(60)
        print("About to notify")
        my_condition.notify()
        print("Done notifying")
    
    @gen.coroutine
    def runner():
        # Wait for waiter() and notifier() in parallel
        yield([waiter(), notifier()])
    
    results = IOLoop.current().run_sync(runner)
    
    
    # output:
    
    # I'll wait right here
    # About to notify
    # Done notifying
    # Received notification now doing my things

    Conclusion

    1) Asynchronous frameworks are not much of use when most of the computations are CPU centric and not I/O.

    2) Due to a single thread per core model and event loop, it can manage thousands of active client connections.

    3) Many say Django is too big, Flask is too small, and Tornado is just right:)

  • Understanding Node.js Async Flows: Parallel, Serial, Waterfall and Queues

    Promises in Javascript has been around for a long time now. It helped solve the problem of callback hell. But as soon as the requirements get complicated with control flows, promises start getting unmanageable and harder to work with. This is where async flows come to the rescue. In this blog, let’s talk about the various async flows which are used frequently rather than raw promises and callbacks.

    Async Utility Module

    Async is a utility module which provides straight-forward, powerful functions for working with asynchronous JavaScript. Although it is built on top of promises, it makes asynchronous code look and behave a little more like synchronous code, making it easier to read and maintain.

    Async utility has a number of control flows. Let’s discuss the most popular ones and their use cases:

    1. Parallel

    When we have to run multiple tasks independent of each other without waiting until the previous task has completed, parallel comes into the picture.

    async.parallel(tasks, callback)

    Tasks: A collection of functions to run. It can be an array, an object or any iterable.

    Callback: This is the callback where all the task results are passed and is executed once all the task execution has completed.

    In case an error is passed to a function’s callback, the main callback is immediately called with the error. Although parallel is about starting I/O tasks in parallel, it’s not about parallel execution since Javascript is single-threaded.

    An example of Parallel is shared below:

    async.parallel([
      function(callback) {
        setTimeout(function() {
          console.log('Task One');
          callback(null, 1);
        }, 200);
      },
      function(callback) {
        setTimeout(function() {
          console.log('Task Two');
          callback(null, 2);
        }, 100);
      }
    ],
    function(err, results) {
      console.log(results);
      // the results array will equal [1, 2] even though
      // the second function had a shorter timeout.
    });
    
    // an example using an object instead of an array
    async.parallel({
      task1: function(callback) {
        setTimeout(function() {
          console.log('Task One');
          callback(null, 1);
        }, 200);
      },
      task2: function(callback) {
        setTimeout(function() {
          console.log('Task Two');
          callback(null, 2);
        }, 100);
        }
    }, function(err, results) {
      console.log(results);
      // results now equals to: { task1: 1, task2: 2 }
    });

    2. Series

    When we have to run multiple tasks which depend on the output of the previous task, series comes to our rescue.

    async.series(tasks, callback)

    Tasks: A collection of functions to run. It can be an array, an object or any iterable.

    Callback: This is the callback where all the task results are passed and is executed once all the task execution has completed.

    Callback function receives an array of result objects when all the tasks have been completed. If an error is encountered in any of the task, no more functions are run but the final callback is called with the error value.

    An example of Series is shared below:

    async.series([
      function(callback) {
        console.log('one');
        callback(null, 1);
      },
      function(callback) {
        console.log('two');
        callback(null, 2);
      },
      function(callback) {
        console.log('three');
        callback(null, 3);
      }
    ],
    function(err, results) {
      console.log(result);
      // results is now equal to [1, 2, 3]
    });
    
    async.series({
      1: function(callback) {
        setTimeout(function() {
          console.log('Task 1');
          callback(null, 'one');
        }, 200);
      },
      2: function(callback) {
        setTimeout(function() {
          console.log('Task 2');
          callback(null, 'two');
        }, 300);
      },
      3: function(callback) {
        setTimeout(function() {
          console.log('Task 3');
          callback(null, 'three');
        }, 100);
      }
    },
    function(err, results) {
      console.log(results);
      // results is now equal to: { 1: 'one', 2: 'two', 3:'three' }
    });

    3. Waterfall

    When we have to run multiple tasks which depend on the output of previous task, Waterfall can be helpful.

    async.waterfall(tasks, callback)

    Tasks: A collection of functions to run. It can be an array, an object or any iterable structure.

    Callback: This is the callback where all the task results are passed and is executed once all the task execution has completed.

    It will run one function at a time and pass the result of the previous function to the next one.

    An example of Waterfall is shared below:

    async.waterfall([
      function(callback) {
        callback(null, 'Task 1', 'Task 2');
      },
      function(arg1, arg2, callback) {
        // arg1 now equals 'Task 1' and arg2 now equals 'Task 2'
        let arg3 = arg1 + ' and ' + arg2;
        callback(null, arg3);
      },
      function(arg1, callback) {
        // arg1 now equals 'Task1 and Task2'
        arg1 += ' completed';
        callback(null, arg1);
      }
    ], function(err, result) {
      // result now equals to 'Task1 and Task2 completed'
      console.log(result);
    });
    
    // Or, with named functions:
    async.waterfall([
      myFirstFunction,
      mySecondFunction,
      myLastFunction,
    ], function(err, result) {
      // result now equals 'Task1 and Task2 completed'
      console.log(result);
    });
    
    function myFirstFunction(callback) {
      callback(null, 'Task 1', 'Task 2');
    }
    function mySecondFunction(arg1, arg2, callback) {
      // arg1 now equals 'Task 1' and arg2 now equals 'Task 2'
      let arg3 = arg1 + ' and ' + arg2;
      callback(null, arg3);
    }
    function myLastFunction(arg1, callback) {
      // arg1 now equals 'Task1 and Task2'
      arg1 += ' completed';
      callback(null, arg1);
    }

    4. Queue

    When we need to run a set of tasks asynchronously, queue can be used. A queue object based on an asynchronous function can be created which is passed as worker.

    async.queue(task, concurrency)

    Task: Here, it takes two parameters, first – the task to be performed and second – the callback function.

    Concurrency: It is the number of functions to be run in parallel.

    async.queue returns a queue object that supports few properties:

    • push: Adds tasks to the queue to be processed.
    • drain: The drain function is called after the last task of the queue.
    • unshift: Adds tasks in front of the queue.

    An example of Queue is shared below:

    // create a queue object with concurrency 2
    var q = async.queue(function(task, callback) {
      console.log('Hello ' + task.name);
      callback();
    }, 2);
    
    // assign a callback
    q.drain = function() {
      console.log('All items have been processed');
    };
    
    // add some items to the queue
    q.push({name: 'foo'}, function(err) {
      console.log('Finished processing foo');
    });
    
    q.push({name: 'bar'}, function (err) {
      console.log('Finished processing bar');
    });
    
    // add some items to the queue (batch-wise)
    q.push([{name: 'baz'},{name: 'bay'},{name: 'bax'}], function(err) {
      console.log('Finished processing item');
    });
    
    // add some items to the front of the queue
    q.unshift({name: 'bar'}, function (err) {
      console.log('Finished processing bar');
    });

    5. Priority Queue

    It is the same as queue, the only difference being that a priority can be assigned to the tasks which is considered in ascending order.

    async.priorityQueue(task,concurrency)

    Task: Here, it takes three parameters:

    • First – task to be performed.
    • Second – priority, it is a number that determines the sequence of execution. For array of tasks, the priority remains same for all of them.
    • Third – Callback function.

    The async.priorityQueue does not support ‘unshift’ property of the queue.

    An example of Priority Queue is shared below:

    // create a queue object with concurrency 1
    var q = async.priorityQueue(function(task, callback) {
      console.log('Hello ' + task.name);
      callback();
    }, 1);
    
    // assign a callback
    q.drain = function() {
      console.log('All items have been processed');
    };
    
    // add some items to the queue with priority
    q.push({name: 'foo'}, 3, function(err) {
      console.log('Finished processing foo');
    });
    
    q.push({name: 'bar'}, 2, function (err) {
      console.log('Finished processing bar');
    });
    
    // add some items to the queue (batch-wise) which will have same priority
    q.push([{name: 'baz'},{name: 'bay'},{name: 'bax'}], 1, function(err) {
      console.log('Finished processing item');
    });

    6. Race

    It runs all the tasks in parallel, but as soon as any of the function completes its execution or passes error to its callback, the main callback is immediately called.

    async.race(tasks, callback)

    Task: Here, it is a collection of functions to run. It is an array or any iterable.

    Callback: The result of the first complete execution is passed. It may be the result or error.

    An example of Race is shared below:

    async.race([
      function (callback) {
        setTimeout(function () {
          callback(null, 'one');
        }, 300);
      },
      function (callback) {
        setTimeout(function () {
          callback(null, 'two');
        }, 100);
      },
      function (callback) {
        setTimeout(function () {
          callback(null, 'three');
        }, 200);
      }
    ],
      // main callback
      function (err, result) {
        // the result will be equal to 'two' as it finishes earlier than the other 2
        console.log('The result is ', result);
      });

    Combining Async Flows

    In complex scenarios, the async flows like parallel and series can be combined and nested. This helps in achieving the expected output with the benefits of async utilities.

    However, the only difference between Waterfall and Series async utility is that the final callback in series receives an array of results of all the task whereas in Waterfall, the result object of the final task is received by the final callback.

    Conclusion

    Async Utilities has an upper hand over promises due to its concise and clean code, better error handling and easier debugging. It makes us realize how simple and easy asynchronous code can be without the syntactical mess of promises and callback hell.