Category: Engineering blogs

  • To Go Serverless Or Not Is The Question

    AWS Lambda was launched in 2014. Since then, serverless computing (Function as a Service) came into existence. We have been using Lambda for our projects for the last couple of years to build complete end-to-end web applications which includes usage of AWS Lambda with API Gateway (REST APIs), CloudWatch (logs), S3 (website hosting & data storage), and so on.

    Google and Azure also provide serverless technologies like AWS. There is one more popular open-source solution, i.e. OpenWhisk. While implementing serverless applications on AWS, we have learned a lot about running a website on Lambda. Like every other technology, serverless also has its own set of benefits and drawbacks that we will discuss here.

    ‍What Does Serverless Mean?‍

    Serverless is a dynamic cloud computing execution model where the server is run by the cloud providers i.e. AWS, Google, or Azure. This technology actually runs on the servers, but when they say serverless, it means that the servers are abstracted away from the users and provided as a service to them.

    The Serverless World‍

    There’s so much excitement for serverless in the industry. But there are issues that sometimes outweigh the pros of serverless architecture and would need complex workarounds. AWS charges for each invocation of Lambda in multiple of 100ms increments. When there are thousands of incoming requests coming up for EC2 servers, we need to scale up servers to handle them, but Lambda does this on its own. We don’t need to create auto-scaling or load balancers. But, how much does it cost to use Lambda? Let’s compare that below.

    Let’s say, we have a serverless application with only 1 Lambda & 1 API Gateway,

    • API Gateway
      $3.50/API calls * 200 million API requests/Month = 700 USD
    • Lambda
      $0.00001667 GB-second * (200 million requests * 0.3 seconds per execution * 1 GB Memory – 400k free tier seconds in case of new account) = 1308 USD
    • Total = 2008 USD (This is a lot)

    Now let’s see the example of the EC2 server,

    • 3 Highly available EC2 Server = 416 USD
      M5.xlarge: 16GB RAM, 4 vCPUs
    • Application Load Balancer = 39 USD
    • Total = 455 USD/Month

    So, if you compare the above pricing, classic servers are cheaper than serverless.

    This can be really useful for startups in their early stages where every rupee counts. In that case, Lambda or serverless will be very useful as it charges only for the number of hits coming on your server with less management and more development for the team. For example – you have your development environment for developers, so, instead of setting up new servers, you can go with serverless development.

    Loss Of Control‍

    One of the biggest disadvantages of serverless is that you don’t have the control over your services. We use a lot of services that are managed by third-party cloud providers, like Cloudwatch for logs and DynamoDB for databases. Also, various functions need to be managed as your project grows, and everything is handled by cloud providers. You lose portability as soon as you integrate with other services like Lambda with SNS, DynamoDB, Kinesis, and it also results in vendor lock-in. It becomes difficult for you to change the vendor later.

    On the other hand, in the non-serverless world, we can manage our language versions, queues, or db queries. Basically. we have all codes at one place where we don’t need to manage multiple functions. But every technology has its pros and cons which we said earlier as well. In serverless, there is a loss of control that leads to focusing less on development and more on adding the business values of our product.

    Choosing serverless or non-serverless will completely depend on the product type. If you have a simple application like selling cakes online and you need simple implementation or authentication, you can go with serverless. But if your application is really complex, you need to add some complex algorithm. To have the control over your code, security, and authentication, you should go with non-serverless.

    Security Issues‍

    The biggest risk in serverless or using cloud services is poorly configured functions, services, or applications. Bad configuration can lead to multiple issues in your application which can be either security-related or infrastructure-related. It doesn’t matter which cloud provider you are using, AWS, GCP, or Azure, it’s important to correctly configure your functions or services with the permission it needs to access other services and manage controls. Otherwise, it can lead to permission issues or security breach. Also, if you are connecting any third-party APIs with your provider, make sure the connections are safe and data is encrypted in the right format.

    Giving correct configuration is the most important thing in both serverless and non-serverless applications. When you use cloud services and be very strict about it, you will interact less with security breaches or permission issues in the near future.

    Testing & Debugging‍

    Serverless applications are hard to test. Normally, developers test the code locally and then deploy it. But in the serverless world, testing on local seems to be complicated, as no such tool is available to mock the cloud services on the local environment. So, we need to perform a decent amount of integration testing before moving forward. Currently, you can test & debug the code using Console or Print statement which will be visible in your Cloudwatch logs like below is one code snippet in Node.js.

    const https = require('https')
    let url = "https://google.com"
    
    exports.handler = async function(event) {
      const promise = new Promise(function(resolve, reject) {
        console.log("Processing URL: "url)
        https.get(url, (res) => {
            resolve(res.statusCode)
            // console for debugging / testing purpose.
            console.info("Request was successfull!!!")
          }).on('error', (e) => {
            reject(Error(e))
            // console for errors
            console.error("Error while processing:" + e)
          })
        })
      return promise
    }

    For serverless applications, it is important to give some time & effort upfront to architect your application correctly and create good integration tests over cloud infrastructure.

    It is difficult to test or debug the applications in serverless. In non-serverless applications, we debug the code, but in serverless, we need to debug end-to-end integration with multiple services that we use. Lambdas are so short-lived that till the time you search for the logs, they disappear. So, in this situation, we can use AWS Cloudwatch or Google Stackdriver that are meant to do that.

    Cold Start

    Regular cold start

    Source: AWS re: Invent

    An issue remains an issue until you trace that, and some technical issues are hard to find until you know or face them. Yes, Lambda has one such drawback which is known as Cold start. Lambda gets cold, it means, Lambda code runs on the server which is managed by Amazon. To make it feasible, Amazon doesn’t keep everyone’s code warm, i.e. it doesn’t serve all requests at the same time. So, if your particular function hasn’t run in a while, a request has to wait for Lambda to spin up the server then invoke the code, which will take some time for Lambda to give the result for that request.

    But, wait for how long? I was using Node.js and it took around 4 seconds to respond. This is not good for the end-user experience and it can impact your business. This kind of issue is not tolerable in today’s world where we need requests to respond faster to provide a better user experience.

    The problem is not much for limited Lambdas, but what if the number of Lambdas increases. Let’s say, there are 50s-100s of Lambdas, and warming up every Lambda can be annoying. You have to call Lambdas before the user calls it again, I mean, why? But there isn’t any solution rather than warming it. I particularly used the Serverless Framework for my serverless implementation. It helped me achieve most of the problems of Lambdas and other resources that we used to build serverless applications.

    Conclusion

    Serverless has many problems, I agree, but which tech doesn’t. When you choose either serverless or non-serverless, make sure you do your study and analyze your requirements to decide which direction to enter. If you want to implement quicker, small applications with strict deadlines and less budget, go with serverless, otherwise, choose EC2 servers. It mainly depends on the requirements. If you are using serverless, some frameworks will help you a lot. Also, you can compare the pricing here.

    If you are new to serverless and want to implement it from scratch, you can have a look at the following link.

    Currently, serverless has its downsides, but hoping that Amazon and other cloud providers will come up with some good solutions to make it more efficient. We look forward to learning as the technology evolves.

  • How to Use Pytest Fixtures With Django Models

    With the test framework that Python and Django provide, there is a lot of code boilerplate, maintainability, and duplication issues rise as your projects grow. It’s also not a very pythonic way of writing tests.

    Pytest provides a simple and more elegant way to write tests.

    It provides the ability to write tests as functions, which means a lot of boilerplate code has been removed, making your code more readable and easy to maintain. Pytest also provides functionality in terms of test discovery—and defining and using fixtures.

    Why Pytest Fixtures?

    When writing tests, it’s very common that the test will need objects, and those objects may be needed by multiple tests. There might be a complicated process for the creation of these objects. It will be difficult to add that complex process in each of the test cases, and on any model changes, we will need to update our logic in all places. This will create issues of code duplication and its maintainability.

    To avoid all of this, we can use the fixture provided by the pytest, where we will define the fixture in one place, and then we can inject that fixture in any of the tests in a much simpler way.

    Briefly, if we have to understand fixtures, in the literal sense, they are where we prepare everything for our test. They’re everything that the test needs to do its thing.

    We are going to explore how effectively we can make use of fixtures with Django models that are more readable and easy to maintain. These are the fixtures provided by the pytest and not to be confused with Django fixtures.

    Installation and Setup

    For this blog, we will set up a basic e-commerce application and set up the test suite for pytest.

    Creating Django App

    Before we begin testing, let’s create a basic e-commerce application and add a few models on which we can perform tests later.

    To create a Django app, go to the folder you want to work in, open the terminal, and run the below commands:

    $ django-admin startproject e_commerce_app
    $ cd e-commerce-app
    $ python manage.py startapp product

    Once the app is created, go to the settings.py and add the newly created product app to the INSTALLED_APPS.

    # Application definition
    INSTALLED_APPS = [
        'django.contrib.admin',
        'django.contrib.auth',
        'django.contrib.contenttypes',
        'django.contrib.sessions',
        'django.contrib.messages',
        'django.contrib.staticfiles',
        'product'
    ]

    Now, let’s create basic models in the models.py of the product app.

    from django.db import models
    
    class Retail(models.Model):
        name = models.CharField(max_length=128)
    
    class Category(models.Model):
        name = models.CharField(max_length=128, unique=True)
    
    class Product(models.Model):
        sku = models.CharField(max_length=50, unique=True)  # unique model number
        name = models.CharField(max_length=50)
        description = models.TextField(default="", blank=True)
        mrp = models.DecimalField(max_digits=10, decimal_places=2)
        weight = models.DecimalField(max_digits=10, decimal_places=2)
        retails = models.ManyToManyField(
            Retail,
            related_name="products",
            verbose_name="Retail stores that carry the product",
        )
        category = models.ForeignKey(
            Category, 
    				related_name="products", 
    				on_delete=models.CASCADE,
    				blank=True,
    				null=True,
        )
        date_created = models.DateTimeField(auto_now_add=True)
        date_modified = models.DateTimeField(auto_now=True)

    Here, each product will have a category and will be available at many retail stores. Now, let’s run the migration file and migrate the changes:

    $ python manage.py makemigrations
    $ python manage.py migrate

    The models and database is now ready, and we can move on to writing test cases for these models.

    Let’s set up the pytest in our Django app first.

    For testing our Django applications with pytest, we will use the plugin pytest-django, which provides a set of useful tools for testing Django apps and projects. Let’s start with installing and configuration of the plugin.

    Installing pytest

    Pytest can be installed with pip:

    $ pip install pytest-django

    Installing pytest-django will also automatically install the latest version of pytest. Once installed, we need to tell pytest-django where our settings.py file is located.

    The easiest way to do this is to create a pytest configuration file with this information.

    Create a file called pytest.ini in your project directory and add this content:

    [pytest]
    DJANGO_SETTINGS_MODULE=e_commerce_app.settings 

    You can provide various configurations in the file that will define how our tests should run.

    e.g. To configure how test files should be detected across project, we can add this line:

    [pytest]
    DJANGO_SETTINGS_MODULE=e_commerce_app.settings
    python_files = tests.py test_*.py *_tests.py

    Adding Test Suite to the Django App

    Django and pytest automatically detect and run your test cases in files whose name starts with ‘test’.

    In the product app folder, create a new module named tests. Then add a file called test_models.py in which we will write all the model test cases for this app.

    $ cd product
    $ mkdir tests
    $ cd tests && touch test_models.py

    Running your Test Suite

    Tests are invoked directly with the pytest command:

    $ pytest
    $ pytest tests                          # test a directory
    $ pytest test.py                        # test file

    For now, we are configured and ready for writing the first test with pytest and Django.

    Writing Tests with Pytest

    Here, we will write a few test cases to test the models we have written in the models.py file. To start with, let’s create a simple test case to test the category creation.

    from product.models import Category
    
    def test_create_category():
        category = Category.objects.create(name="Books")
        assert category.name == "Books"

    Now, try to execute this test from your command line:

    $ pytest
    ============================= test session starts ==============================
    platform linux -- Python 3.7.0, pytest-6.2.2, py-1.10.0, pluggy-0.13.1
    django: settings: pytest_fixtures.settings (from ini)
    rootdir: /home/suraj/PycharmProjects/e_commerce_app, configfile: pytest.ini
    plugins: django-4.1.0
    collected 1 item                                                               
    
    product/tests/test_models.py F                                           [100%]
    
    =================================== FAILURES ===================================
    _____________________________ test_create_category _____________________________
    
        def test_create_category():
    >       category = Category.objects.create(name="Books")
    
    product/tests/test_models.py:5: 
    ...
    E       RuntimeError: Database access not allowed, use the "django_db" mark, or the "db" or "transactional_db" fixtures to enable it.
    
    venv/lib/python3.7/site-packages/django/db/backends/base/base.py:235: RuntimeError
    =========================== short test summary info ============================
    FAILED product/tests/test_models.py::test_create_category - RuntimeError: Dat...
    ============================== 1 failed in 0.21s ===============================

    The tests failed. If you look at the error, it has to do something with the database. The pytest-django doc says:

    pytest-django takes a conservative approach to enabling database access. By default your tests will fail if they try to access the database. Only if you explicitly request database access will this be allowed. This encourages you to keep database-needing tests to a minimum which makes it very clear what code uses the database.

    This means we need to explicitly provide database access to our test cases. For this, we need to use [pytest marks](<https://docs.pytest.org/en/stable/mark.html#mark>) to tell pytest-django your test needs database access.

    from product.models import Category
    
    @pytest.mark.django_db
    def test_create_category():
        category = Category.objects.create(name="Books")
        assert category.name == "Books"

    Alternatively, there is one more way we can access the database in the test cases, i.e., using the db helper fixture provided by the pytest-django. This fixture will ensure the Django database is set up. It’s only required for fixtures that want to use the database themselves.

    from product.models import Category
    
    def test_create_category(db):
        category = Category.objects.create(name="Books")
        assert category.name == "Books"

    Going forward, we will use the db fixture approach as it promotes code reusability using fixtures.

    Run the test again:

    $ pytest
    ============================= test session starts ==============================
    platform linux -- Python 3.7.0, pytest-6.2.2, py-1.10.0, pluggy-0.13.1
    django: settings: pytest_fixtures.settings (from ini)
    rootdir: /home/suraj/PycharmProjects/e_commerce_app, configfile: pytest.ini
    plugins: django-4.1.0
    collected 1 item                                                               
    
    product/tests/test_models.py .                                           [100%]
    
    ============================== 1 passed in 0.24s ===============================

    The command completed successfully and your test passed. Great! We have successfully written our first test case using pytest.

    Creating Fixtures for Django Models

    Now that you’re familiar with Django and pytest, let’s add a test case to check if the to-check category updates.

    from product.models import Category
    
    def test_filter_category(db):
        Category.objects.create(name="Books")
        assert Category.objects.filter(name="Books").exists()
    
    def test_update_category(db):
        category = Category.objects.create(name="Books")
        category.name = "DVDs"
        category.save()
        category_from_db = Category.objects.get(name="DVDs")
        assert category_from_db.name == "DVDs"

    If you look at both the test cases, one thing you can observe is that both the test cases do not test Category creation logic, and the Category instance is also getting created twice, once per test case. Once the project becomes large, we might have many test cases that will need the Category instance. If every test is creating its own category, then you might face trouble if any changes to the Category model happen.

    This is where fixtures come to the rescue. It promotes code reusability in your test cases. To reuse an object in many test cases, you can create a test fixture:

    import pytest
    from product.models import Category
    
    @pytest.fixture
    def category(db) -> Category:
        return Category.objects.create(name="Books")
    
    def test_filter_category(category):
        assert Category.objects.filter(name="Books").exists()
    
    def test_update_category(category):
        category.name = "DVDs"
        category.save()
        category_from_db = Category.objects.get(name="DVDs")
        assert category_from_db.name == "DVDs"

    Here, we have created a simple function called category and decorated it with @pytest.fixture to mark it as a fixture. It can now be injected into the test cases just like we injected the fixture db.

    Now, if a new requirement comes in that every category should have a description and a small icon to represent the category, we don’t need to now go to each test case and update the category to create logic. We just need to update the fixture, i.e., only one place. And it will take effect in every test case.

    import pytest
    from product.models import Category
    
    @pytest.fixture
    def category(db) -> Category:
        return Category.objects.create(
            name="Books", description="Category of Books", icon="books.png"
        )

    Using fixtures, you can avoid code duplication and make tests more maintainable.

    Parametrizing fixtures

    It is recommended to have a single fixture function that can be executed across different input values. This can be achieved via parameterized pytest fixtures.

    Let’s write the fixture for the product and consider we will need to create a SKU product number that has 6 characters and contains only alphanumeric characters.

    import pytest
    from product.models import Category, Product
    
    @pytest.fixture
    def product_one(db):
        return Product.objects.create(name="Book 1", sku="ABC123")
    
    def test_product_sku(product_one):
        assert all(letter.isalnum() for letter in product_one.sku)
        assert len(product_one.sku) == 6

    We now want to test the case against multiple sku cases and make sure for all types of inputs the test is validated. We can flag the fixture to create three different product_one fixture instances. The fixture function gets access to each parameter through the special request object:

    import pytest
    from product.models import Product
    
    @pytest.fixture(params=("ABC123", "123456", "ABCDEF"))
    def product_one(db,request):
        return Product.objects.create(name="Book 1",sku=request.param)
    
    def test_product_sku(product_one):
        assert all(letter.isalnum() for letter in product_one.sku)
        assert len(product_one.sku) == 6

    Fixture functions can be parametrized in which case they will be called multiple times, each time executing the set of dependent tests, i.e., the tests that depend on this fixture.

    Test functions usually do not need to be aware of their re-running. Fixture parametrization helps to write exhaustive functional tests for components that can be configured in multiple ways.

    Open the terminal and run the test:

    $ pytest
    ============================= test session starts ==============================
    platform linux -- Python 3.7.0, pytest-6.2.2, py-1.10.0, pluggy-0.13.1
    django: settings: pytest_fixtures.settings (from ini)
    rootdir: /home/suraj/PycharmProjects/e_commerce_app, configfile: pytest.ini
    plugins: django-4.1.0
    collected 3 items                                                              
    
    product/tests/test_models.py ...                                         [100%]
    
    ============================== 3 passed in 0.27s ===============================

    We can see that our test_product_sku function ran thrice.

    Injecting Fixtures into Other fixtures.

    We will often come across a case wherein, we will need an object for a case that will be dependent on some other object. Let’s try to create a few products under the category “Books”.

    import pytest
    
    from product.models import Category, Product
    
    @pytest.fixture
    def product_one(db):
        category = Category.objects.create(name="Books")
        return Product.objects.create(name="Book 1", category=category)
    
    @pytest.fixture
    def product_two(db):
        category = Category.objects.create(name="Books")
        return Product.objects.create(name="Book 2", category=category)
    
    def test_two_different_books_create(product_one, product_two):
        assert product_one.pk != product_two.pk

    If we try to test this in the terminal, we will encounter an error:

    $ pytest
    ============================= test session starts ==============================
    platform linux -- Python 3.7.0, pytest-6.2.2, py-1.10.0, pluggy-0.13.1
    django: settings: pytest_fixtures.settings (from ini)
    rootdir: /home/suraj/PycharmProjects/e_commerce_app, configfile: pytest.ini
    plugins: django-4.1.0
    collected 1 item                                                               
    
    product/tests/test_models.py E                                           [100%]
    
    ==================================== ERRORS ====================================
    ______________ ERROR at setup of test_two_different_books_create _______________
    ...
    query = 'INSERT INTO "product_category" ("name") VALUES (?)', params = ['Books']
    ...
    E       django.db.utils.IntegrityError: UNIQUE constraint failed: product_category.name
    
    venv/lib/python3.7/site-packages/django/db/backends/sqlite3/base.py:413: IntegrityError
    =========================== short test summary info ============================
    ERROR product/tests/test_models.py::test_two_different_books_create - django....
    =============================== 1 error in 0.44s ===============================

    The test case throws an IntegrityError, saying we tried to create the “Books” category twice. And if you look at the code, we have created the category in both product_one and product_two fixtures. What could we have done better?

    If you look carefully, we have injected db in both the product_one and product_two fixtures, and db is just another fixture. So that means fixtures can be injected into other fixtures.

    One of pytest’s greatest strengths is its extremely flexible fixture system. It allows us to boil down complex requirements for tests into more simple and organized functions, where we only need to have each one describe the things they are dependent on.

    You can use this feature to address the IntegrityError above. Create the category fixture and inject it into both the product fixtures.

    import pytest
    from product.models import Category, Product
    
    @pytest.fixture
    def category(db) -> Category:
        return Category.objects.create(name="Books")
    
    @pytest.fixture
    def product_one(db, category):
        return Product.objects.create(name="Book 1", category=category)
    
    @pytest.fixture
    def product_two(db, category):
        return Product.objects.create(name="Book 2", category=category)
    
    def test_two_different_books_create(product_one, product_two):
        assert product_one.pk != product_two.pk

    If we try to run the test now, it should run successfully.

    $ pytest
    ============================= test session starts ==============================
    platform linux -- Python 3.7.0, pytest-6.2.2, py-1.10.0, pluggy-0.13.1
    django: settings: pytest_fixtures.settings (from ini)
    rootdir: /home/suraj/PycharmProjects/e_commerce_app, configfile: pytest.ini
    plugins: django-4.1.0
    collected 1 item                                                               
    
    product/tests/test_models.py .                                           [100%]
    
    ============================== 1 passed in 0.20s ===============================

    By restructuring the fixtures this way, we have made code easier to maintain. By simply injecting fixtures, we can maintain a lot of complex model fixtures in a much simpler way.

    Let’s say we need to add an example where product one and product two will be sold by retail shop “ABC”. This can be easily achieved by injecting retailer fixtures into the product fixture.

    import pytest
    from product.models import Category, Product, Retail
    
    @pytest.fixture
    def category(db) -> Category:
        return Category.objects.create(name="Books")
    
    @pytest.fixture
    def retailer_abc(db):
        return Retail.objects.create(name="ABC")
    
    @pytest.fixture
    def product_one(db, category, retailer_abc):
        product = Product.objects.create(name="Book 1", category=category)
        product.retails.add(retailer_abc)
        return product
    
    def test_product_retailer(db, retailer_abc, product_one):
        assert product_one.retails.filter(name=retailer_abc.name).exists()

    Autouse Fixtures

    Sometimes, you may want to have a fixture (or even several) that you know all your tests will depend on. “Autouse” fixtures are a convenient way of making all tests automatically request them. This can cut out a lot of redundant requests, and can even provide more advanced fixture usage.

    We can make a fixture an autouse fixture by passing in autouse=True to the fixture’s decorator. Here’s a simple example of how they can be used:

    import pytest
    from product.models import Category, Product, Retail
    ...
    
    @pytest.fixture
    def retailer_abc(db):
        return Retail.objects.create(name="ABC")
    
    @pytest.fixture
    def retailers(db) -> list:
        return []
    
    @pytest.fixture(autouse=True)
    def append_retailers(retailers, retailer_abc):
        return retailers.append(retailer_abc)
    
    @pytest.fixture
    def product_one(db, category, retailers):
        product = Product.objects.create(name="Book 1", category=category)
        product.retails.set(retailers)
        return product
    
    def test_product_retailer(db, retailer_abc, product_one):
        assert product_one.retails.filter(name=retailer_abc.name).exists()

    In this example, the append_retailers fixture is an autouse fixture. Because it happens automatically, test_product_retailer is affected by it, even though the test did not request it. That doesn’t mean they can’t be requested though; just that it isn’t necessary.

    Factories as Fixtures

    So far, we have created objects with a small number of arguments. However, practically models are a bit more complex and may require more inputs. Let’s say we will need to store the sku, mrp, and weight information along with name and category.

    If we decide to provide every input to the product fixture, then the logic inside the product fixtures will get a little complicated.

    import random
    import string
    import pytest
    from product.models import Category, Product, Retail
    
    @pytest.fixture
    def category(db) -> Category:
        return Category.objects.create(name="Books")
    
    @pytest.fixture
    def retailer_abc(db):
        return Retail.objects.create(name="ABC")
    
    @pytest.fixture
    def product_one(db, category, retailer_abc):
        sku = "".join(random.choices(string.ascii_uppercase + string.digits, k=6))
        product = Product.objects.create(
            sku=sku,
            name="Book 1",
            description="A book for educational purpose.",
            mrp="100.00",
            is_available=True,
            category=category,
        )
        product.retails.set([retailer_abc])
        return product
    
    @pytest.fixture
    def product_two(db, category, retailer):
        sku = "".join(random.choices(string.ascii_uppercase + string.digits, k=6))
        product = Product.objects.create(
            sku=sku,
            name="Book 2",
            description="A book with thriller story.",
            mrp="50.00",
            is_available=True,
            category=category,
        )
        product.retails.add([retailer])
        return product

    Product creation has a somewhat complex logic of managing retailers and generating unique SKU. And the product creation logic will grow as we keep adding requirements. There may be some extra logic needed if we consider discounts and coupon code complexity for every retailer. There may also be a lot of versions of the product instance we may want to test against, and you have already learned how difficult it is to maintain such a complex code.

    The “factory as fixture” pattern can help in these cases where the same class instance is needed for different tests. Instead of returning an instance directly, the fixture will return a function, and upon calling which one, you can get the distance that you wanted to test.

    import random
    import string
    import pytest
    
    from product.models import Category, Product, Retail
    
    @pytest.fixture
    def category(db) -> Category:
        return Category.objects.create(name="Books")
    
    @pytest.fixture
    def retailer_abc(db):
        return Retail.objects.create(name="ABC")
    
    @pytest.fixture
    def product_factory(db, category, retailer_abc):
        def create_product(
            name, description="A Book", mrp=None, is_available=True, retailers=None
        ):
            if retailers is None:
                retailers = []
            sku = "".join(random.choices(string.ascii_uppercase + 
    										string.digits, k=6))
            product = Product.objects.create(
                sku=sku,
                name=name,
                description=description,
                mrp=mrp,
                is_available=is_available,
                category=category,
            )
            product.retails.add(retailer_abc)
            if retailers:
                product.retails.set(retailers)
            return product
    
        return create_product
    
    @pytest.fixture
    def product_one(product_factory):
        return product_factory(name="Book 1", mrp="100.2")
    
    @pytest.fixture
    def product_two(product_factory):
        return product_factory(name="Novel Book", mrp="51")
    
    def test_product_retailer(db, retailer_abc, product_one):
        assert product_one.retails.filter(name=retailer_abc.name).exists()
    
    def test_product_one(product_one):
        assert product_one.name == "Book 1"
        assert product_one.is_available

    This is not far from what you’ve already done, so let’s break it down:

    • The category and retailer_abc fixture remains the same.
    • A new product_factory fixture is added, and it is injected with the category  and  retailer_abc fixture.
    • The fixture product_factory creates a wrapper and returns an inner function called create_product.
    • Inject product_factory into another fixture and use it to create a product instance

    The factory fixture works similar to how decorators work in python.

    Sharing Fixtures Using Scopes

    Fixtures requiring network or db access depend on connectivity and are usually time-expensive to create. In the previous example, every time we request any fixture within our tests, it is used to run the method, generate an instance and pass them to the test. So if we have written ‘n’ tests, and every test calls for the same fixture then that fixture instance will be created n times during the entire execution.

    This is mainly happening because fixtures are created when first requested by a test, and are destroyed based on their scope:

    • Function: the default scope, the fixture is destroyed at the end of the test.
    • Class: the fixture is destroyed during the teardown of the last test in the class.
    • Module: the fixture is destroyed during teardown of the last test in the module.
    • Package: the fixture is destroyed during teardown of the last test in the package.
    • Session: the fixture is destroyed at the end of the test session.

    In the previous example, we can add scope=”module” so that the category, retailer_abc, product_one, and product_two instances will only be invoked once per test module.

    Multiple test functions in a test module will thus each receive the same category, retailer_abc, product_one, and product_two fixture instance, thus saving time.

    @pytest.fixture(scope="module")
    def category(db) -> Category:
        return Category.objects.create(name="Books")
    
    @pytest.fixture(scope="module")
    def retailer_abc(db):
        return Retail.objects.create(name="ABC")
    
    @pytest.fixture(scope="module")
    def product_one(product_factory):
        return product_factory(name="Book 1", mrp="100.2")
    
    @pytest.fixture(scope="module")
    def product_two(product_factory):
        return product_factory(name="Novel Book", mrp="51")

    This is how we can add scope to the fixtures, and you can do it for all the fixtures.

    But, If we try to test this in the terminal, we will encounter an error:

    $ pytest
    ============================= test session starts ==============================
    platform linux -- Python 3.7.0, pytest-6.2.2, py-1.10.0, pluggy-0.13.1
    django: settings: pytest_fixtures.settings (from ini)
    rootdir: /home/suraj/PycharmProjects/e_commerce_app, configfile: pytest.ini
    plugins: django-4.1.0
    collected 2 items                                                              
    
    product/tests/test_models.py EE                                          [100%]
    
    ==================================== ERRORS ====================================
    ___________________ ERROR at setup of test_product_retailer ____________________
    ScopeMismatch: You tried to access the 'function' scoped fixture 'db' with a 'module' scoped request object, involved factories
    product/tests/test_models.py:13:  def retailer_abc(db) -> product.models.Category
    venv/lib/python3.7/site-packages/pytest_django/fixtures.py:193:  def db(request, django_db_setup, django_db_blocker)
    ______________________ ERROR at setup of test_product_one ______________________
    ScopeMismatch: You tried to access the 'function' scoped fixture 'db' with a 'module' scoped request object, involved factories
    ...
    ============================== 2 errors in 0.24s ===============================  

    The reason for this error is that the db fixture has the function scope for a reason, so the transaction rollbacks on the end of each test ensure the database is left in the same state it has when the test starts. Nevertheless, you can have the session/module scoped access to the database in the fixture by using the django_db_blocker fixture:

    import random
    import string
    import pytest
    
    from product.models import Category, Product, Retail
    
    @pytest.fixture(scope="module")
    def category(django_db_blocker):
        with django_db_blocker.unblock():
            return Category.objects.create(name="Books")
    
    @pytest.fixture(scope="module")
    def retailer_abc(django_db_blocker):
        with django_db_blocker.unblock():
            return Retail.objects.create(name="ABC")
    
    @pytest.fixture(scope="module")
    def product_factory(django_db_blocker, category, retailer_abc):
        def create_product(
            name, description="A Book", mrp=None, is_available=True, retailers=None
        ):
            if retailers is None:
                retailers = []
            sku = "".join(random.choices(
    								 string.ascii_uppercase + string.digits, k=6)
    								)
            with django_db_blocker.unblock():
                product = Product.objects.create(
                    sku=sku,
                    name=name,
                    description=description,
                    mrp=mrp,
                    is_available=is_available,
                    category=category,
                )
                product.retails.add(retailer_abc)
                if retailers:
                    product.retails.set(retailers)
                return product
    
        return create_product
    
    @pytest.fixture(scope="module")
    def product_one(product_factory):
        return product_factory(name="Book 1", mrp="100.2")
    
    @pytest.fixture(scope="module")
    def product_two(product_factory):
        return product_factory(name="Novel Book", mrp="51")
    
    def test_product_retailer(db, retailer_abc, product_one):
        assert product_one.retails.filter(name=retailer_abc.name).exists()
    
    def test_product_one(product_one):
        assert product_one.name == "Book 1"
        assert product_one.is_available

    Now, if we go to the terminal and run the tests, it will run successfully.

    $ pytest
    ============================= test session starts ==============================
    platform linux -- Python 3.7.0, pytest-6.2.2, py-1.10.0, pluggy-0.13.1
    django: settings: pytest_fixtures.settings (from ini)
    rootdir: /home/suraj/PycharmProjects/e_commerce_app, configfile: pytest.ini
    plugins: django-4.1.0
    collected 2 items                                                              
    
    product/tests/test_models.py ..                                          [100%]
    
    ============================== 2 passed in 0.22s ===============================

    Warning: Beware that when unlocking the database in session scope, you’re on your own if you alter the database in other fixtures or tests.

    Conclusion

    We have successfully learned various features pytest fixtures provide and how we can benefit from the code reusability perspective and have maintainable code in your tests. Dependency management and arranging your test data becomes easy with the help of fixtures.

    This was a blog about how you can use fixtures and the various features it provides along with the Django models. You can check more on fixtures by referring to the official documentation.

  • Building A Containerized Microservice in Golang: A Step-by-step Guide

    With the evolving architectural design of web applications, microservices have been a successful new trend in architecting the application landscape. Along with the advancements in application architecture, transport method protocols, such as REST and gRPC are getting better in efficiency and speed. Also, containerizing microservice applications help greatly in agile development and high-speed delivery.

    In this blog, I will try to showcase how simple it is to build a cloud-native application on the microservices architecture using Go.

    We will break the solution into multiple steps. We will learn how to:

    1) Build a microservice and set of other containerized services which will have a very specific set of independent tasks and will be related only with the specific logical component.

    2) Use go-kit as the framework for developing and structuring the components of each service.

    3) Build APIs that will use HTTP (REST) and Protobuf (gRPC) as the transport mechanisms, PostgreSQL for databases and finally deploy it on Azure stack for API management and CI/CD.

    Note: Deployment, setting up the CI-CD and API-Management on Azure or any other cloud is not in the scope of the current blog.

    Prerequisites:

    • A beginner’s level of understanding of web services, Rest APIs and gRPC
    • GoLand/ VS Code
    • Properly installed and configured Go. If not, check it out here
    • Set up a new project directory under the GOPATH
    • Understanding of the standard Golang project. For reference, visit here
    • PostgreSQL client installed
    • Go kit

    What are we going to do?

    We will develop a simple web application working on the following problem statement:

    • A global publishing company that publishes books and journals wants to develop a service to watermark their documents. A document (books, journals) has a title, author and a watermark property
    • The watermark operation can be in Started, InProgress and Finished status
    • The specific set of users should be able to do the watermark on a document
    • Once the watermark is done, the document can never be re-marked

    Example of a document:

    {content: “book”, title: “The Dark Code”, author: “Bruce Wayne”, topic: “Science”}

    For a detailed understanding of the requirement, please refer to this.

    Architecture:

    In this project, we will have 3 microservices: Authentication Service, Database Service and the Watermark Service. We have a PostgreSQL database server and an API-Gateway.

    Authentication Service:

    The application is supposed to have a role-based and user-based access control mechanism. This service will authenticate the user according to its specific role and return HTTP status codes only. 200 when the user is authorized and 401 for unauthorized users.

    APIs:

    • /user/access, Method: GET, Secured: True, payload: user: <name></name>
      It will take the user name as an input and the auth service will return the roles and the privileges assigned to it
    • /authenticate, Method: GET, Secured: True, payload: user: <name>, operation: <op></op></name>
      It will authenticate the user with the passed operation if it is accessible for the role
    • /healthz, Method: GET, Secured: True
      It will return the status of the service

    Database Service:

    We will need databases for our application to store the user, their roles and the access privileges to that role. Also, the documents will be stored in the database without the watermark. It is a requirement that any document cannot have a watermark at the time of creation. A document is said to be created successfully only when the data inputs are valid and the database service returns the success status.

    We will be using two databases for two different services for them to be consumed. This design is not necessary, but just to follow the “Single Database per Service” rule under the microservice architecture.

    APIs:

    • /get, Method: GET, Secured: True, payload: filters: []filter{“field-name”: “value”}
      It will return the list of documents according to the specific filters passed
    • /update, Method: POST, Secured: True, payload: “Title”: <id>, document: {“field”: “value”, …}</id>
      It will update the document for the given title id
    • /add, Method: POST, Secured: True, payload: document: {“field”: “value”, …}
      It will add the document and return the title-ID
    • /remove Method: POST, Secured: True, payload: title: <id></id>
      It will remove the document entry according to the passed title-id
    • /healthz, Method: GET, Secured: True
      It will return the status of the service

    Watermark Service:

    This is the main service that will perform the API calls to watermark the passed document. Every time a user needs to watermark a document, it needs to pass the TicketID in the watermark API request along with the appropriate Mark. It will try to call the database Update API internally with the provided request and returns the status of the watermark process which will be initially “Started”, then in some time the status will be “InProgress” and if the call was valid, the status will be “Finished”, or “Error”, if the request is not valid.

    APIs:

    • /get, Method: GET, Secured: True, payload: filters: []filter{“field-name”: “value”}
      It will return the list of documents according to the specific filters passed
    • /status, Method: GET, Secured: True, payload: “Ticket”: <id></id>
      It will return the status of the document for watermark operation for the passed ticket-id
    • /addDocument, Method: POST, Secured: True, payload: document: {“field”: “value”, …}
      It will add the document and return the title-ID
    • /watermark, Method: POST, Secured: True, payload: title: <id>, mark: “string”</id>
      It is the main watermark operation API which will accept the mark string
    • /healthz, Method: GET, Secured: True
      It will return the status of the service

    Operations and Flow:

    Watermark Service APIs are the only ones that will be used by the user/actor to request watermark or add the document. Authentication and Database service APIs are the private ones that will be called by other services internally. The only URL accessible to the user is the API Gateway URL.

    1. The user will access the API Gateway URL with the required user name, the ticket-id and the mark with which the user wants the document to apply watermark
    2. The user should not know about the authentication or database services
    3. Once the request is made by the user, it will be accepted by the API Gateway. The gateway will validate the request along with the payload
    4. An API forwarding rule of configuring the traffic of a specific request to a service should be defined in the gateway. The request when validated, will be forwarded to the service according to that rule.
    5. We will define an API forwarding rule where the request made for any watermark will be first forwarded to the authentication service which will authenticate the request, check for authorized users and return the appropriate status code.
    6. The authorization service will check for the user from which the request has been made, into the user database and its roles and permissions. It will send the response accordingly
    7. Once the request has been authorized by the service, it will be forwarded back to the actual watermark service
    8. The watermark service then performs the appropriate operation of putting the watermark on the document or add a new entry of the document or any other request
    9. The operation from the watermark service of Get, Watermark or AddDocument will be performed by calling the database CRUD APIs and forwarded to the user
    10. If the request is to AddDocument then the service should return the “TicketID” or if it is for watermark then it should return the status of  the operation

    Note:

    Each user will have some specific roles, based on which the access controls will be identified for the user. For the sake of simplicity, the roles will be based on the type of document only, not the specific name of the book or journal

    Getting Started:

    Let’s start by creating a folder for our application in the $GOPATH. This will be the root folder containing our set of services.

    Project Layout:

    The project will follow the standard Golang project layout. If you want the full working code, please refer here

    • api: Stores the versions of the APIs swagger files and also the proto and pb files for the gRPC protobuf interface.
    • cmd: This will contain the entry point (main.go) files for all the services and also any other container images if any
    • docs: This will contain the documentation for the project
    • config: All the sample files or any specific configuration files should be stored here
    • deploy: This directory will contain the deployment files used to deploy the application
    • internal: This package is the conventional internal package identified by the Go compiler. It contains all the packages which need to be private and imported by its child directories and immediate parent directory. All the packages from this directory are common across the project
    • pkg: This directory will have the complete executing code of all the services in separate packages.
    • tests: It will have all the integration and E2E tests
    • vendor: This directory stores all the third-party dependencies locally so that the version doesn’t mismatch later

    We are going to use the Go kit framework for developing the set of services. The official Go kit examples of services are very good, though the documentation is not that great.

    Watermark Service:

    1. Under the Go kit framework, a service should always be represented by an interface.

    Create a package named watermark in the pkg folder. Create a new service.go file in that package. This file is the blueprint of our service.

    package watermark
    
    import (
    	"context"
    
    	"github.com/velotiotech/watermark-service/internal"
    )
    
    type Service interface {
    	// Get the list of all documents
    	Get(ctx context.Context, filters ...internal.Filter) ([]internal.Document, error)
    	Status(ctx context.Context, ticketID string) (internal.Status, error)
    	Watermark(ctx context.Context, ticketID, mark string) (int, error)
    	AddDocument(ctx context.Context, doc *internal.Document) (string, error)
    	ServiceStatus(ctx context.Context) (int, error)
    }

    2. As per the functions defined in the interface, we will need five endpoints to handle the requests for the above methods. If you are wondering why we are using a context package, please refer here. Contexts enable the microservices to handle the multiple concurrent requests, but maybe in this blog, we are not using it too much. It’s just the best way to work with it.

    3. Implementing our service:

    package watermark
    
    import (
    	"context"
    	"net/http"
    	"os"
    
    	"github.com/velotiotech/watermark-service/internal"
    
    	"github.com/go-kit/kit/log"
    	"github.com/lithammer/shortuuid/v3"
    )
    
    type watermarkService struct{}
    
    func NewService() Service { return &watermarkService{} }
    
    func (w *watermarkService) Get(_ context.Context, filters ...internal.Filter) ([]internal.Document, error) {
    	// query the database using the filters and return the list of documents
    	// return error if the filter (key) is invalid and also return error if no item found
    	doc := internal.Document{
    		Content: "book",
    		Title:   "Harry Potter and Half Blood Prince",
    		Author:  "J.K. Rowling",
    		Topic:   "Fiction and Magic",
    	}
    	return []internal.Document{doc}, nil
    }
    
    func (w *watermarkService) Status(_ context.Context, ticketID string) (internal.Status, error) {
    	// query database using the ticketID and return the document info
    	// return err if the ticketID is invalid or no Document exists for that ticketID
    	return internal.InProgress, nil
    }
    
    func (w *watermarkService) Watermark(_ context.Context, ticketID, mark string) (int, error) {
    	// update the database entry with watermark field as non empty
    	// first check if the watermark status is not already in InProgress, Started or Finished state
    	// If yes, then return invalid request
    	// return error if no item found using the ticketID
    	return http.StatusOK, nil
    }
    
    func (w *watermarkService) AddDocument(_ context.Context, doc *internal.Document) (string, error) {
    	// add the document entry in the database by calling the database service
    	// return error if the doc is invalid and/or the database invalid entry error
    	newTicketID := shortuuid.New()
    	return newTicketID, nil
    }
    
    func (w *watermarkService) ServiceStatus(_ context.Context) (int, error) {
    	logger.Log("Checking the Service health...")
    	return http.StatusOK, nil
    }
    
    var logger log.Logger
    
    func init() {
    	logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
    	logger = log.With(logger, "ts", log.DefaultTimestampUTC)
    }

    We have defined the new type watermarkService empty struct which will implement the above-defined service interface. This struct implementation will be hidden from the rest of the world.

    NewService() is created as the constructor of our “object”. This is the only function available outside this package to instantiate the service.

    4. Now we will create the endpoints package which will contain two files. One is where we will store all types of requests and responses. The other file will be endpoints which will have the actual implementation of the requests parsing and calling the appropriate service function.

    – Create a file named reqJSONMap.go. We will define all the requests and responses struct with the fields in this file such as GetRequest, GetResponse, StatusRequest, StatusResponse, etc. Add the necessary fields in these structs which we want to have input in a request or we want to pass the output in the response.

    package endpoints
    
    import "github.com/velotiotech/watermark-service/internal"
    
    type GetRequest struct {
    	Filters []internal.Filter `json:"filters,omitempty"`
    }
    
    type GetResponse struct {
    	Documents []internal.Document `json:"documents"`
    	Err       string              `json:"err,omitempty"`
    }
    
    type StatusRequest struct {
    	TicketID string `json:"ticketID"`
    }
    
    type StatusResponse struct {
    	Status internal.Status `json:"status"`
    	Err    string          `json:"err,omitempty"`
    }
    
    type WatermarkRequest struct {
    	TicketID string `json:"ticketID"`
    	Mark     string `json:"mark"`
    }
    
    type WatermarkResponse struct {
    	Code int    `json:"code"`
    	Err  string `json:"err"`
    }
    
    type AddDocumentRequest struct {
    	Document *internal.Document `json:"document"`
    }
    
    type AddDocumentResponse struct {
    	TicketID string `json:"ticketID"`
    	Err      string `json:"err,omitempty"`
    }
    
    type ServiceStatusRequest struct{}
    
    type ServiceStatusResponse struct {
    	Code int    `json:"status"`
    	Err  string `json:"err,omitempty"`
    }

    – Create a file named endpoints.go. This file will contain the actual calling of the service implemented functions.

    package endpoints
    
    import (
    	"context"
    	"errors"
    	"os"
    
    	"github.com/aayushrangwala/watermark-service/internal"
    	"github.com/aayushrangwala/watermark-service/pkg/watermark"
    
    	"github.com/go-kit/kit/endpoint"
    	"github.com/go-kit/kit/log"
    )
    
    type Set struct {
    	GetEndpoint           endpoint.Endpoint
    	AddDocumentEndpoint   endpoint.Endpoint
    	StatusEndpoint        endpoint.Endpoint
    	ServiceStatusEndpoint endpoint.Endpoint
    	WatermarkEndpoint     endpoint.Endpoint
    }
    
    func NewEndpointSet(svc watermark.Service) Set {
    	return Set{
    		GetEndpoint:           MakeGetEndpoint(svc),
    		AddDocumentEndpoint:   MakeAddDocumentEndpoint(svc),
    		StatusEndpoint:        MakeStatusEndpoint(svc),
    		ServiceStatusEndpoint: MakeServiceStatusEndpoint(svc),
    		WatermarkEndpoint:     MakeWatermarkEndpoint(svc),
    	}
    }
    
    func MakeGetEndpoint(svc watermark.Service) endpoint.Endpoint {
    	return func(ctx context.Context, request interface{}) (interface{}, error) {
    		req := request.(GetRequest)
    		docs, err := svc.Get(ctx, req.Filters...)
    		if err != nil {
    			return GetResponse{docs, err.Error()}, nil
    		}
    		return GetResponse{docs, ""}, nil
    	}
    }
    
    func MakeStatusEndpoint(svc watermark.Service) endpoint.Endpoint {
    	return func(ctx context.Context, request interface{}) (interface{}, error) {
    		req := request.(StatusRequest)
    		status, err := svc.Status(ctx, req.TicketID)
    		if err != nil {
    			return StatusResponse{Status: status, Err: err.Error()}, nil
    		}
    		return StatusResponse{Status: status, Err: ""}, nil
    	}
    }
    
    func MakeAddDocumentEndpoint(svc watermark.Service) endpoint.Endpoint {
    	return func(ctx context.Context, request interface{}) (interface{}, error) {
    		req := request.(AddDocumentRequest)
    		ticketID, err := svc.AddDocument(ctx, req.Document)
    		if err != nil {
    			return AddDocumentResponse{TicketID: ticketID, Err: err.Error()}, nil
    		}
    		return AddDocumentResponse{TicketID: ticketID, Err: ""}, nil
    	}
    }
    
    func MakeWatermarkEndpoint(svc watermark.Service) endpoint.Endpoint {
    	return func(ctx context.Context, request interface{}) (interface{}, error) {
    		req := request.(WatermarkRequest)
    		code, err := svc.Watermark(ctx, req.TicketID, req.Mark)
    		if err != nil {
    			return WatermarkResponse{Code: code, Err: err.Error()}, nil
    		}
    		return WatermarkResponse{Code: code, Err: ""}, nil
    	}
    }
    
    func MakeServiceStatusEndpoint(svc watermark.Service) endpoint.Endpoint {
    	return func(ctx context.Context, request interface{}) (interface{}, error) {
    		_ = request.(ServiceStatusRequest)
    		code, err := svc.ServiceStatus(ctx)
    		if err != nil {
    			return ServiceStatusResponse{Code: code, Err: err.Error()}, nil
    		}
    		return ServiceStatusResponse{Code: code, Err: ""}, nil
    	}
    }
    
    func (s *Set) Get(ctx context.Context, filters ...internal.Filter) ([]internal.Document, error) {
    	resp, err := s.GetEndpoint(ctx, GetRequest{Filters: filters})
    	if err != nil {
    		return []internal.Document{}, err
    	}
    	getResp := resp.(GetResponse)
    	if getResp.Err != "" {
    		return []internal.Document{}, errors.New(getResp.Err)
    	}
    	return getResp.Documents, nil
    }
    
    func (s *Set) ServiceStatus(ctx context.Context) (int, error) {
    	resp, err := s.ServiceStatusEndpoint(ctx, ServiceStatusRequest{})
    	svcStatusResp := resp.(ServiceStatusResponse)
    	if err != nil {
    		return svcStatusResp.Code, err
    	}
    	if svcStatusResp.Err != "" {
    		return svcStatusResp.Code, errors.New(svcStatusResp.Err)
    	}
    	return svcStatusResp.Code, nil
    }
    
    func (s *Set) AddDocument(ctx context.Context, doc *internal.Document) (string, error) {
    	resp, err := s.AddDocumentEndpoint(ctx, AddDocumentRequest{Document: doc})
    	if err != nil {
    		return "", err
    	}
    	adResp := resp.(AddDocumentResponse)
    	if adResp.Err != "" {
    		return "", errors.New(adResp.Err)
    	}
    	return adResp.TicketID, nil
    }
    
    func (s *Set) Status(ctx context.Context, ticketID string) (internal.Status, error) {
    	resp, err := s.StatusEndpoint(ctx, StatusRequest{TicketID: ticketID})
    	if err != nil {
    		return internal.Failed, err
    	}
    	stsResp := resp.(StatusResponse)
    	if stsResp.Err != "" {
    		return internal.Failed, errors.New(stsResp.Err)
    	}
    	return stsResp.Status, nil
    }
    
    func (s *Set) Watermark(ctx context.Context, ticketID, mark string) (int, error) {
    	resp, err := s.WatermarkEndpoint(ctx, WatermarkRequest{TicketID: ticketID, Mark: mark})
    	wmResp := resp.(WatermarkResponse)
    	if err != nil {
    		return wmResp.Code, err
    	}
    	if wmResp.Err != "" {
    		return wmResp.Code, errors.New(wmResp.Err)
    	}
    	return wmResp.Code, nil
    }
    
    var logger log.Logger
    
    func init() {
    	logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
    	logger = log.With(logger, "ts", log.DefaultTimestampUTC)
    }

    In this file, we have a struct Set which is the collection of all the endpoints. We have a constructor for the same. We have the internal constructor functions which will return the objects which implement the generic endpoint. Endpoint interface of Go kit such as MakeGetEndpoint(), MakeStatusEndpoint() etc.

    In order to expose the Get, Status, Watermark, ServiceStatus and AddDocument APIs, we need to create endpoints for all of them. These functions handle the incoming requests and call the specific service methods

    5. Adding the Transports method to expose the services. Our services will support HTTP and will be exposed using Rest APIs and protobuf and gRPC.

    Create a separate package of transport in the watermark directory. This package will hold all the handlers, decoders and encoders for a specific type of transport mechanism

    6. Create a file http.go: This file will have the transport functions and handlers for HTTP with a separate path as the API routes.

    package transport
    
    import (
    	"context"
    	"encoding/json"
    	"net/http"
    	"os"
    
    	"github.com/velotiotech/watermark-service/internal/util"
    	"github.com/velotiotech/watermark-service/pkg/watermark/endpoints"
    
    	"github.com/go-kit/kit/log"
    	httptransport "github.com/go-kit/kit/transport/http"
    )
    
    func NewHTTPHandler(ep endpoints.Set) http.Handler {
    	m := http.NewServeMux()
    
    	m.Handle("/healthz", httptransport.NewServer(
    		ep.ServiceStatusEndpoint,
    		decodeHTTPServiceStatusRequest,
    		encodeResponse,
    	))
    	m.Handle("/status", httptransport.NewServer(
    		ep.StatusEndpoint,
    		decodeHTTPStatusRequest,
    		encodeResponse,
    	))
    	m.Handle("/addDocument", httptransport.NewServer(
    		ep.AddDocumentEndpoint,
    		decodeHTTPAddDocumentRequest,
    		encodeResponse,
    	))
    	m.Handle("/get", httptransport.NewServer(
    		ep.GetEndpoint,
    		decodeHTTPGetRequest,
    		encodeResponse,
    	))
    	m.Handle("/watermark", httptransport.NewServer(
    		ep.WatermarkEndpoint,
    		decodeHTTPWatermarkRequest,
    		encodeResponse,
    	))
    
    	return m
    }
    
    func decodeHTTPGetRequest(_ context.Context, r *http.Request) (interface{}, error) {
    	var req endpoints.GetRequest
    	if r.ContentLength == 0 {
    		logger.Log("Get request with no body")
    		return req, nil
    	}
    	err := json.NewDecoder(r.Body).Decode(&req)
    	if err != nil {
    		return nil, err
    	}
    	return req, nil
    }
    
    func decodeHTTPStatusRequest(ctx context.Context, r *http.Request) (interface{}, error) {
    	var req endpoints.StatusRequest
    	err := json.NewDecoder(r.Body).Decode(&req)
    	if err != nil {
    		return nil, err
    	}
    	return req, nil
    }
    
    func decodeHTTPWatermarkRequest(_ context.Context, r *http.Request) (interface{}, error) {
    	var req endpoints.WatermarkRequest
    	err := json.NewDecoder(r.Body).Decode(&req)
    	if err != nil {
    		return nil, err
    	}
    	return req, nil
    }
    
    func decodeHTTPAddDocumentRequest(_ context.Context, r *http.Request) (interface{}, error) {
    	var req endpoints.AddDocumentRequest
    	err := json.NewDecoder(r.Body).Decode(&req)
    	if err != nil {
    		return nil, err
    	}
    	return req, nil
    }
    
    func decodeHTTPServiceStatusRequest(_ context.Context, _ *http.Request) (interface{}, error) {
    	var req endpoints.ServiceStatusRequest
    	return req, nil
    }
    
    func encodeResponse(ctx context.Context, w http.ResponseWriter, response interface{}) error {
    	if e, ok := response.(error); ok && e != nil {
    		encodeError(ctx, e, w)
    		return nil
    	}
    	return json.NewEncoder(w).Encode(response)
    }
    
    func encodeError(_ context.Context, err error, w http.ResponseWriter) {
    	w.Header().Set("Content-Type", "application/json; charset=utf-8")
    	switch err {
    	case util.ErrUnknown:
    		w.WriteHeader(http.StatusNotFound)
    	case util.ErrInvalidArgument:
    		w.WriteHeader(http.StatusBadRequest)
    	default:
    		w.WriteHeader(http.StatusInternalServerError)
    	}
    	json.NewEncoder(w).Encode(map[string]interface{}{
    		"error": err.Error(),
    	})
    }
    
    var logger log.Logger
    
    func init() {
    	logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
    	logger = log.With(logger, "ts", log.DefaultTimestampUTC)
    }

    This file is the map of the JSON payload to their requests and responses. It contains the HTTP handler constructor which registers the API routes to the specific handler function (endpoints) and also the decoder-encoder of the requests and responses respectively into a server object for a request. The decoders and encoders are basically defined just to translate the request and responses in the desired form to be processed. In our case, we are just converting the requests/responses using the json encoder and decoder into the appropriate request and response structs.

    We have the generic encoder for the response output, which is a simple JSON encoder.

    7. Create another file in the same transport package with the name grpc.go. Similar to above, the name of the file is self-explanatory. It is the map of protobuf payload to their requests and responses. We create a gRPC handler constructor which will create the set of grpcServers and registers the appropriate endpoint to the decoders and encoders of the request and responses

    package transport
    
    import (
    	"context"
    
    	"github.com/velotiotech/watermark-service/api/v1/pb/watermark"
    
    	"github.com/velotiotech/watermark-service/internal"
    	"github.com/velotiotech/watermark-service/pkg/watermark/endpoints"
    
    	grpctransport "github.com/go-kit/kit/transport/grpc"
    )
    
    type grpcServer struct {
    	get           grpctransport.Handler
    	status        grpctransport.Handler
    	addDocument   grpctransport.Handler
    	watermark     grpctransport.Handler
    	serviceStatus grpctransport.Handler
    }
    
    func NewGRPCServer(ep endpoints.Set) watermark.WatermarkServer {
    	return &grpcServer{
    		get: grpctransport.NewServer(
    			ep.GetEndpoint,
    			decodeGRPCGetRequest,
    			decodeGRPCGetResponse,
    		),
    		status: grpctransport.NewServer(
    			ep.StatusEndpoint,
    			decodeGRPCStatusRequest,
    			decodeGRPCStatusResponse,
    		),
    		addDocument: grpctransport.NewServer(
    			ep.AddDocumentEndpoint,
    			decodeGRPCAddDocumentRequest,
    			decodeGRPCAddDocumentResponse,
    		),
    		watermark: grpctransport.NewServer(
    			ep.WatermarkEndpoint,
    			decodeGRPCWatermarkRequest,
    			decodeGRPCWatermarkResponse,
    		),
    		serviceStatus: grpctransport.NewServer(
    			ep.ServiceStatusEndpoint,
    			decodeGRPCServiceStatusRequest,
    			decodeGRPCServiceStatusResponse,
    		),
    	}
    }
    
    func (g *grpcServer) Get(ctx context.Context, r *watermark.GetRequest) (*watermark.GetReply, error) {
    	_, rep, err := g.get.ServeGRPC(ctx, r)
    	if err != nil {
    		return nil, err
    	}
    	return rep.(*watermark.GetReply), nil
    }
    
    func (g *grpcServer) ServiceStatus(ctx context.Context, r *watermark.ServiceStatusRequest) (*watermark.ServiceStatusReply, error) {
    	_, rep, err := g.get.ServeGRPC(ctx, r)
    	if err != nil {
    		return nil, err
    	}
    	return rep.(*watermark.ServiceStatusReply), nil
    }
    
    func (g *grpcServer) AddDocument(ctx context.Context, r *watermark.AddDocumentRequest) (*watermark.AddDocumentReply, error) {
    	_, rep, err := g.addDocument.ServeGRPC(ctx, r)
    	if err != nil {
    		return nil, err
    	}
    	return rep.(*watermark.AddDocumentReply), nil
    }
    
    func (g *grpcServer) Status(ctx context.Context, r *watermark.StatusRequest) (*watermark.StatusReply, error) {
    	_, rep, err := g.status.ServeGRPC(ctx, r)
    	if err != nil {
    		return nil, err
    	}
    	return rep.(*watermark.StatusReply), nil
    }
    
    func (g *grpcServer) Watermark(ctx context.Context, r *watermark.WatermarkRequest) (*watermark.WatermarkReply, error) {
    	_, rep, err := g.watermark.ServeGRPC(ctx, r)
    	if err != nil {
    		return nil, err
    	}
    	return rep.(*watermark.WatermarkReply), nil
    }
    
    func decodeGRPCGetRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
    	req := grpcReq.(*watermark.GetRequest)
    	var filters []internal.Filter
    	for _, f := range req.Filters {
    		filters = append(filters, internal.Filter{Key: f.Key, Value: f.Value})
    	}
    	return endpoints.GetRequest{Filters: filters}, nil
    }
    
    func decodeGRPCStatusRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
    	req := grpcReq.(*watermark.StatusRequest)
    	return endpoints.StatusRequest{TicketID: req.TicketID}, nil
    }
    
    func decodeGRPCWatermarkRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
    	req := grpcReq.(*watermark.WatermarkRequest)
    	return endpoints.WatermarkRequest{TicketID: req.TicketID, Mark: req.Mark}, nil
    }
    
    func decodeGRPCAddDocumentRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
    	req := grpcReq.(*watermark.AddDocumentRequest)
    	doc := &internal.Document{
    		Content:   req.Document.Content,
    		Title:     req.Document.Title,
    		Author:    req.Document.Author,
    		Topic:     req.Document.Topic,
    		Watermark: req.Document.Watermark,
    	}
    	return endpoints.AddDocumentRequest{Document: doc}, nil
    }
    
    func decodeGRPCServiceStatusRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
    	return endpoints.ServiceStatusRequest{}, nil
    }
    
    func decodeGRPCGetResponse(_ context.Context, grpcReply interface{}) (interface{}, error) {
    	reply := grpcReply.(*watermark.GetReply)
    	var docs []internal.Document
    	for _, d := range reply.Documents {
    		doc := internal.Document{
    			Content:   d.Content,
    			Title:     d.Title,
    			Author:    d.Author,
    			Topic:     d.Topic,
    			Watermark: d.Watermark,
    		}
    		docs = append(docs, doc)
    	}
    	return endpoints.GetResponse{Documents: docs, Err: reply.Err}, nil
    }
    
    func decodeGRPCStatusResponse(_ context.Context, grpcReply interface{}) (interface{}, error) {
    	reply := grpcReply.(*watermark.StatusReply)
    	return endpoints.StatusResponse{Status: internal.Status(reply.Status), Err: reply.Err}, nil
    }
    
    func decodeGRPCWatermarkResponse(ctx context.Context, grpcReply interface{}) (interface{}, error) {
    	reply := grpcReply.(*watermark.WatermarkReply)
    	return endpoints.WatermarkResponse{Code: int(reply.Code), Err: reply.Err}, nil
    }
    
    func decodeGRPCAddDocumentResponse(ctx context.Context, grpcReply interface{}) (interface{}, error) {
    	reply := grpcReply.(*watermark.AddDocumentReply)
    	return endpoints.AddDocumentResponse{TicketID: reply.TicketID, Err: reply.Err}, nil
    }
    
    func decodeGRPCServiceStatusResponse(ctx context.Context, grpcReply interface{}) (interface{}, error) {
    	reply := grpcReply.(*watermark.ServiceStatusReply)
    	return endpoints.ServiceStatusResponse{Code: int(reply.Code), Err: reply.Err}, nil
    }

    – Before moving on to the implementation, we have to create a proto file that acts as the definition of all our service interface and the requests response structs, so that the protobuf files (.pb) can be generated to be used as an interface between services to communicate.

    – Create package pb in the api/v1 package path. Create a new file watermarksvc.proto. Firstly, we will create our service interface, which represents the remote functions to be called by the client. Refer to this for syntax and deep understanding of the protobuf.

    We will convert the service interface to the service interface in the proto file. Also, we have created the request and response structs exactly the same once again in the proto file so that they can be understood by the RPC defined in the service.

    syntax = "proto3";
    
    package pb;
    
    service Watermark {
        rpc Get (GetRequest) returns (GetReply) {}
    
        rpc Watermark (WatermarkRequest) returns (WatermarkReply) {}
    
        rpc Status (StatusRequest) returns (StatusReply) {}
    
        rpc AddDocument (AddDocumentRequest) returns (AddDocumentReply) {}
    
        rpc ServiceStatus (ServiceStatusRequest) returns (ServiceStatusReply) {}
    }
    
    message Document {
        string content = 1;
        string title = 2;
        string author = 3;
        string topic = 4;
        string watermark = 5;
    }
    
    message GetRequest {
        message Filters {
            string key = 1;
            string value = 2;
        }
        repeated Filters filters = 1;
    }
    
    message GetReply {
        repeated Document documents = 1;
        string Err = 2;
    }
    
    message StatusRequest {
        string ticketID = 1;
    }
    
    message StatusReply {
        enum Status {
            PENDING = 0;
            STARTED = 1;
            IN_PROGRESS = 2;
            FINISHED = 3;
            FAILED = 4;
        }
        Status status = 1;
        string Err = 2;
    }
    
    message WatermarkRequest {
        string ticketID = 1;
        string mark = 2;
    }
    
    message WatermarkReply {
        int64 code = 1;
        string err = 2;
    }
    
    message AddDocumentRequest {
        Document document = 1;
    }
    
    message AddDocumentReply {
        string ticketID = 1;
        string err = 2;
    }
    
    message ServiceStatusRequest {}
    
    message ServiceStatusReply {
        int64 code = 1;
        string err = 2;
    }

    Note: Creating the proto files and generating the pb files using protoc is not the scope of this blog. We have assumed that you already know how to create a proto file and generate a pb file from it. If not, please refer protobuf and protoc gen

    I have also created a script to generate the pb file, which just needs the path with the name of the proto file.

    #!/usr/bin/env sh
    
    # Install proto3 from source
    #  brew install autoconf automake libtool
    #  git clone https://github.com/google/protobuf
    #  ./autogen.sh ; ./configure ; make ; make install
    #
    # Update protoc Go bindings via
    #  go get -u github.com/golang/protobuf/{proto,protoc-gen-go}
    #
    # See also
    #  https://github.com/grpc/grpc-go/tree/master/examples
    
    REPO_ROOT="${REPO_ROOT:-$(cd "$(dirname "$0")/../.." && pwd)}"
    PB_PATH="${REPO_ROOT}/api/v1/pb"
    PROTO_FILE=${1:-"watermarksvc.proto"}
    
    
    echo "Generating pb files for ${PROTO_FILE} service"
    protoc -I="${PB_PATH}"  "${PB_PATH}/${PROTO_FILE}" --go_out=plugins=grpc:"${PB_PATH}"

    8. Now, once the pb file is generated in api/v1/pb/watermark package, we will create a new struct grpcserver, grouping all the endpoints for gRPC. This struct should implement pb.WatermarkServer which is the server interface referred by the services.

    To implement these services, we are defining the functions such as func (g *grpcServer) Get(ctx context.Context, r *pb.GetRequest) (*pb.GetReply, error). This function should take the request param and run the ServeGRPC() function and then return the response. Similarly, we should implement the ServeGRPC() functions for the rest of the functions.

    These functions are the actual Remote Procedures to be called by the service.

    We will also need to add the decode and encode functions for the request and response structs from protobuf structs. These functions will map the proto Request/Response struct to the endpoint req/resp structs. For example: func decodeGRPCGetRequest(_ context.Context, grpcReq interface{}) (interface{}, error). This will assert the grpcReq to pb.GetRequest and use its fields to fill the new struct of type endpoints.GetRequest{}. The decoding and encoding functions should be implemented similarly for the other requests and responses.

    package transport
    
    import (
    	"context"
    
    	"github.com/velotiotech/watermark-service/api/v1/pb/watermark"
    
    	"github.com/velotiotech/watermark-service/internal"
    	"github.com/velotiotech/watermark-service/pkg/watermark/endpoints"
    
    	grpctransport "github.com/go-kit/kit/transport/grpc"
    )
    
    type grpcServer struct {
    	get           grpctransport.Handler
    	status        grpctransport.Handler
    	addDocument   grpctransport.Handler
    	watermark     grpctransport.Handler
    	serviceStatus grpctransport.Handler
    }
    
    func NewGRPCServer(ep endpoints.Set) watermark.WatermarkServer {
    	return &grpcServer{
    		get: grpctransport.NewServer(
    			ep.GetEndpoint,
    			decodeGRPCGetRequest,
    			decodeGRPCGetResponse,
    		),
    		status: grpctransport.NewServer(
    			ep.StatusEndpoint,
    			decodeGRPCStatusRequest,
    			decodeGRPCStatusResponse,
    		),
    		addDocument: grpctransport.NewServer(
    			ep.AddDocumentEndpoint,
    			decodeGRPCAddDocumentRequest,
    			decodeGRPCAddDocumentResponse,
    		),
    		watermark: grpctransport.NewServer(
    			ep.WatermarkEndpoint,
    			decodeGRPCWatermarkRequest,
    			decodeGRPCWatermarkResponse,
    		),
    		serviceStatus: grpctransport.NewServer(
    			ep.ServiceStatusEndpoint,
    			decodeGRPCServiceStatusRequest,
    			decodeGRPCServiceStatusResponse,
    		),
    	}
    }
    
    func (g *grpcServer) Get(ctx context.Context, r *watermark.GetRequest) (*watermark.GetReply, error) {
    	_, rep, err := g.get.ServeGRPC(ctx, r)
    	if err != nil {
    		return nil, err
    	}
    	return rep.(*watermark.GetReply), nil
    }
    
    func (g *grpcServer) ServiceStatus(ctx context.Context, r *watermark.ServiceStatusRequest) (*watermark.ServiceStatusReply, error) {
    	_, rep, err := g.get.ServeGRPC(ctx, r)
    	if err != nil {
    		return nil, err
    	}
    	return rep.(*watermark.ServiceStatusReply), nil
    }
    
    func (g *grpcServer) AddDocument(ctx context.Context, r *watermark.AddDocumentRequest) (*watermark.AddDocumentReply, error) {
    	_, rep, err := g.addDocument.ServeGRPC(ctx, r)
    	if err != nil {
    		return nil, err
    	}
    	return rep.(*watermark.AddDocumentReply), nil
    }
    
    func (g *grpcServer) Status(ctx context.Context, r *watermark.StatusRequest) (*watermark.StatusReply, error) {
    	_, rep, err := g.status.ServeGRPC(ctx, r)
    	if err != nil {
    		return nil, err
    	}
    	return rep.(*watermark.StatusReply), nil
    }
    
    func (g *grpcServer) Watermark(ctx context.Context, r *watermark.WatermarkRequest) (*watermark.WatermarkReply, error) {
    	_, rep, err := g.watermark.ServeGRPC(ctx, r)
    	if err != nil {
    		return nil, err
    	}
    	return rep.(*watermark.WatermarkReply), nil
    }
    
    func decodeGRPCGetRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
    	req := grpcReq.(*watermark.GetRequest)
    	var filters []internal.Filter
    	for _, f := range req.Filters {
    		filters = append(filters, internal.Filter{Key: f.Key, Value: f.Value})
    	}
    	return endpoints.GetRequest{Filters: filters}, nil
    }
    
    func decodeGRPCStatusRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
    	req := grpcReq.(*watermark.StatusRequest)
    	return endpoints.StatusRequest{TicketID: req.TicketID}, nil
    }
    
    func decodeGRPCWatermarkRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
    	req := grpcReq.(*watermark.WatermarkRequest)
    	return endpoints.WatermarkRequest{TicketID: req.TicketID, Mark: req.Mark}, nil
    }
    
    func decodeGRPCAddDocumentRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
    	req := grpcReq.(*watermark.AddDocumentRequest)
    	doc := &internal.Document{
    		Content:   req.Document.Content,
    		Title:     req.Document.Title,
    		Author:    req.Document.Author,
    		Topic:     req.Document.Topic,
    		Watermark: req.Document.Watermark,
    	}
    	return endpoints.AddDocumentRequest{Document: doc}, nil
    }
    
    func decodeGRPCServiceStatusRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
    	return endpoints.ServiceStatusRequest{}, nil
    }
    
    func decodeGRPCGetResponse(_ context.Context, grpcReply interface{}) (interface{}, error) {
    	reply := grpcReply.(*watermark.GetReply)
    	var docs []internal.Document
    	for _, d := range reply.Documents {
    		doc := internal.Document{
    			Content:   d.Content,
    			Title:     d.Title,
    			Author:    d.Author,
    			Topic:     d.Topic,
    			Watermark: d.Watermark,
    		}
    		docs = append(docs, doc)
    	}
    	return endpoints.GetResponse{Documents: docs, Err: reply.Err}, nil
    }
    
    func decodeGRPCStatusResponse(_ context.Context, grpcReply interface{}) (interface{}, error) {
    	reply := grpcReply.(*watermark.StatusReply)
    	return endpoints.StatusResponse{Status: internal.Status(reply.Status), Err: reply.Err}, nil
    }
    
    func decodeGRPCWatermarkResponse(ctx context.Context, grpcReply interface{}) (interface{}, error) {
    	reply := grpcReply.(*watermark.WatermarkReply)
    	return endpoints.WatermarkResponse{Code: int(reply.Code), Err: reply.Err}, nil
    }
    
    func decodeGRPCAddDocumentResponse(ctx context.Context, grpcReply interface{}) (interface{}, error) {
    	reply := grpcReply.(*watermark.AddDocumentReply)
    	return endpoints.AddDocumentResponse{TicketID: reply.TicketID, Err: reply.Err}, nil
    }
    
    func decodeGRPCServiceStatusResponse(ctx context.Context, grpcReply interface{}) (interface{}, error) {
    	reply := grpcReply.(*watermark.ServiceStatusReply)
    	return endpoints.ServiceStatusResponse{Code: int(reply.Code), Err: reply.Err}, nil
    }

    9. Finally, we just have to create the entry point files (main) in the cmd for each service. As we already have mapped the appropriate routes to the endpoints by calling the service functions and also we mapped the proto service server to the endpoints by calling ServeGRPC() functions, now we have to call the HTTP and gRPC server constructors here and start them.

    Create a package watermark in the cmd directory and create a file watermark.go which will hold the code to start and stop the HTTP and gRPC server for the service

    package main
    
    import (
    	"fmt"
    	"net"
    	"net/http"
    	"os"
    	"os/signal"
    	"syscall"
    
    	pb "github.com/velotiotech/watermark-service/api/v1/pb/watermark"
    	"github.com/velotiotech/watermark-service/pkg/watermark"
    	"github.com/velotiotech/watermark-service/pkg/watermark/endpoints"
    	"github.com/velotiotech/watermark-service/pkg/watermark/transport"
    
    	"github.com/go-kit/kit/log"
    	kitgrpc "github.com/go-kit/kit/transport/grpc"
    	"github.com/oklog/oklog/pkg/group"
    	"google.golang.org/grpc"
    )
    
    const (
    	defaultHTTPPort = "8081"
    	defaultGRPCPort = "8082"
    )
    
    func main() {
    	var (
    		logger   log.Logger
    		httpAddr = net.JoinHostPort("localhost", envString("HTTP_PORT", defaultHTTPPort))
    		grpcAddr = net.JoinHostPort("localhost", envString("GRPC_PORT", defaultGRPCPort))
    	)
    
    	logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
    	logger = log.With(logger, "ts", log.DefaultTimestampUTC)
    
    	var (
    		service     = watermark.NewService()
    		eps         = endpoints.NewEndpointSet(service)
    		httpHandler = transport.NewHTTPHandler(eps)
    		grpcServer  = transport.NewGRPCServer(eps)
    	)
    
    	var g group.Group
    	{
    		// The HTTP listener mounts the Go kit HTTP handler we created.
    		httpListener, err := net.Listen("tcp", httpAddr)
    		if err != nil {
    			logger.Log("transport", "HTTP", "during", "Listen", "err", err)
    			os.Exit(1)
    		}
    		g.Add(func() error {
    			logger.Log("transport", "HTTP", "addr", httpAddr)
    			return http.Serve(httpListener, httpHandler)
    		}, func(error) {
    			httpListener.Close()
    		})
    	}
    	{
    		// The gRPC listener mounts the Go kit gRPC server we created.
    		grpcListener, err := net.Listen("tcp", grpcAddr)
    		if err != nil {
    			logger.Log("transport", "gRPC", "during", "Listen", "err", err)
    			os.Exit(1)
    		}
    		g.Add(func() error {
    			logger.Log("transport", "gRPC", "addr", grpcAddr)
    			// we add the Go Kit gRPC Interceptor to our gRPC service as it is used by
    			// the here demonstrated zipkin tracing middleware.
    			baseServer := grpc.NewServer(grpc.UnaryInterceptor(kitgrpc.Interceptor))
    			pb.RegisterWatermarkServer(baseServer, grpcServer)
    			return baseServer.Serve(grpcListener)
    		}, func(error) {
    			grpcListener.Close()
    		})
    	}
    	{
    		// This function just sits and waits for ctrl-C.
    		cancelInterrupt := make(chan struct{})
    		g.Add(func() error {
    			c := make(chan os.Signal, 1)
    			signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
    			select {
    			case sig := <-c:
    				return fmt.Errorf("received signal %s", sig)
    			case <-cancelInterrupt:
    				return nil
    			}
    		}, func(error) {
    			close(cancelInterrupt)
    		})
    	}
    	logger.Log("exit", g.Run())
    }
    
    func envString(env, fallback string) string {
    	e := os.Getenv(env)
    	if e == "" {
    		return fallback
    	}
    	return e
    }

    Let’s walk you through the above code. Firstly, we will use the fixed ports to make the server listen to them. 8081 for HTTP Server and 8082 for gRPC Server. Then in these code stubs, we will create the HTTP and gRPC servers, endpoints of the service backend and the service.

    service = watermark.NewService()
    eps = endpoints.NewEndpointSet(service)
    grpcServer = transport.NewGRPCServer(eps)
    httpHandler = transport.NewHTTPHandler(eps)

    Now the next step is interesting. We are creating a variable of oklog.Group. If you are new to this term, please refer here. Group helps you elegantly manage the group of Goroutines. We are creating three Goroutines: One for HTTP server, second for gRPC server and the last one for watching on the cancel interrupts. Just like this:

    g.Add(func() error {
        logger.Log("transport", "HTTP", "addr", httpAddr)
        return http.Serve(httpListener, httpHandler)
    }, func(error) {
        httpListener.Close()
    })

    Similarly, we will start a gRPC server and a cancel interrupt watcher.
    Great!! We are done here. Now, let’s run the service.

    go run ./cmd/watermark/watermark.go

    The server has started locally. Now, just open a Postman or run curl to one of the endpoints. See below:
    We ran the HTTP server to check the service status:

    ~ curl http://localhost:8081/healthz
    {"status":200}

    We have successfully created a service and ran the endpoints.

    Further:

    I really like to make a project complete always with all the other maintenance parts revolving around. Just like adding the proper README, have proper .gitignore, .dockerignore, Makefile, Dockerfiles, golang-ci-lint config files, and CI-CD config files etc.

    I have created a separate Dockerfile for each of the three services in path /images/.

    I have created a multi-staged dockerfile to create the binary of the service and run it. We will just copy the appropriate directories of code in the docker image, build the image all in one and then create a new image in the same file and copy the binary in it from the previous one. Similarly, the dockerfiles are created for other services also.

    In the dockerfile, we have given the CMD as go run watermark. This command will be the entry point of the container.
    I have also created a Makefile which has two main targets: build-image and build-push. The first one is to build the image and the second is to push it.

    Note: I am keeping this blog concise as it is difficult to cover all the things. The code in the repo that I have shared in the beginning covers most of the important concepts around services. I am still working and continue committing improvements and features.

    Let’s see how we can deploy:

    We will see how to deploy all these services in the containerized orchestration tools (ex: Kubernetes). Assuming you have worked on Kubernetes with at least a beginner’s understanding before.

    In deploy dir, create a sample deployment having three containers: auth, watermark and database. Since for each container, the entry point commands are already defined in the dockerfiles, we don’t need to send any args or cmd in the deployment.

    We will also need the service which will be used to route the external traffic of request from another load balancer service or nodeport type service. To make it work, we might have to create a nodeport type of service to expose the watermark-service to make it running for now.

    Another important and very interesting part is to deploy the API Gateway. It is required to have at least some knowledge of any cloud provider stack to deploy the API Gateway. I have used Azure stack to deploy an API Gateway using the resource called as “API-Management” in the Azure plane. Refer the rules config files for the Azure APIM api-gateway:

    Further, only a proper CI/CD setup is remaining which is one of the most essential parts of a project after development.
    I would definitely like to discuss all the above deployment-related stuff in more detail but that is not in the scope of my current blog. Maybe I will post another blog for the same.

    Wrapping up:

    We have learned how to build a complete project with three microservices in Golang using one of the best-distributed system development frameworks: Go kit. We have also used the database PostgreSQL using the GORM used heavily in the Go community.
    We did not stop just at the development but also we tried to theoretically cover the development lifecycle of the project by understanding what, how and where to deploy.

    We created one microservice completely from scratch. Go kit makes it very simple to write the relationship between endpoints, service implementations and the communication/transport mechanisms. Now, go and try to create other services from the problem statement.

  • ClickHouse – The Newest Data Store in Your Big Data Arsenal

    ClickHouse

    ClickHouse is an open-source column-oriented data warehouse for online analytical processing of queries (OLAP). It is fast, scalable, flexible, cost-efficient, and easy to run. It supports the best in the industry query performance while significantly reducing storage requirements through innovative use of columnar storage and compression.

    ClickHouse’s performance exceeds comparable column-oriented database management systems that are available on the market. ClickHouse is a database management system, not a single database. ClickHouse allows creating tables and databases at runtime, loading data, and running queries without reconfiguring and restarting the server.

    ClickHouse processes from hundreds of millions to over a billion rows of data across hundreds of node clusters. It utilizes all available hardware for processing queries to their fastest. The peak processing performance for a single query stands at more than two terabytes per second.

    What makes ClickHouse unique?

    • Data Storage & Compression: ClickHouse is designed to work on regular hard drives but uses SSD and additional RAM if available. Data compression in ClickHouse plays a crucial role in achieving excellent performance. It provides general-purpose compression codecs and some specialized codecs for specific kinds of data. These codecs have different CPU consumption and disk space and help ClickHouse outperform other databases.
    • High Performance: By using vector computation, engine data is processed by vectors which are parts of columns, and achieve high CPU efficiency. It supports parallel processing across multiple cores, turning large queries into parallelized naturally. ClickHouse also supports distributed query processing; data resides across shards which are used for parallel execution of the query.
    • Primary & Secondary Index: Data is sorted physically by the primary key allowing low latency extraction of specific values or ranges. The secondary index in ClickHouse enable the database to know that the query filtering conditions would skip some of the parts entirely. Therefore, these are also called data skipping indexes.
    • Support for Approximated Calculations: ClickHouse trades accuracy for performance by approximated calculations. It provides aggregate functions for an approximated estimate of several distinct values, medians, and quantiles. It retrieves proportionally fewer data from the disk to run queries based on the part of data to get approximated results.
    • Data Replication and Data Integrity Support: All the remaining duplicates retrieve their copies in the background after being written to any available replica. The system keeps identical data on several clones. Most failures are recovered automatically or semi-automatically in complex scenarios.

    But it can’t be all good, can it? there are some disadvantages to ClickHouse as well:

    • No full-fledged transactions.
    • Inability to efficiently and precisely change or remove previously input data. For example, to comply with GDPR, data could well be cleaned up or modified using batch deletes and updates.
    • ClickHouse is less efficient for point queries that retrieve individual rows by their keys due to the sparse index.

    ClickHouse against its contemporaries

    So with all these distinctive features, how does ClickHouse compare with other industry-leading data storage tools. Now, ClickHouse being general-purpose, has a variety of use cases, and it has its pros and cons, so here’s a high-level comparison against the best tools in their domain. Depending on the use case, each tool has its unique traits, and comparison around them would not be fair, but what we care about the most is performance, scalability, cost, and other key attributes that can be compared irrespective of the domain. So here we go:

    ClickHouse vs Snowflake:

    • With its decoupled storage & compute approach, Snowflake is able to segregate workloads and enhance performance. The search optimization service in Snowflake further enhances the performance for point lookups but has additional costs attached with it. ClickHouse, on the other hand, with local runtime and inherent support for multiple forms of indexing, drastically improves query performance.
    • Regarding scalability, ClickHouse being on-prem makes it slightly challenging to scale compared to Snowflake, which is cloud-based. Managing hardware manually by provisioning clusters and migrating is doable but tedious. But one possible solution to tackle is to deploy CH on the cloud, a very good option that is cheaper and, frankly, the most viable. 

    ClickHouse vs Redshift:

    • Redshift is a managed, scalable cloud data warehouse. It offers both provisioned and serverless options. Its RA3 nodes compute scalably and cache the necessary data. Still, even with that, its performance does not separate different workloads that are on the same data putting it on the lower end of the decoupled compute & storage cloud architectures. ClickHouse’s local runtime is one of the fastest. 
    • Both Redshift and ClickHouse are columnar, sort data, allowing read-only specific data. But deploying CH is cheaper, and although RS is tailored to be a ready-to-use tool, CH is better if you’re not entirely dependent on Redshift’s features like configuration, backup & monitoring.

    ClickHouse vs InfluxDB:

    • InfluxDB, written in Go, this open-source no-SQL is one of the most popular choices when it comes to dealing with time-series data and analysis. Despite being a general-purpose analytical DB, ClickHouse provides competitive write performance. 
    • ClickHouse’s data structures like AggregatingMergeTree allow real-time data to be stored in a pre-aggregated format which puts it on par in performance regarding TSDBs. It is significantly faster in heavy queries and comparable in the case of light queries.

    ClickHouse vs PostgreSQL:

    • Postgres is another DB that is very versatile and thus is widely used by the world for various use cases, just like ClickHouse. Postgres, however, is an OLTP DB, so unlike ClickHouse, analytics is not its primary aim, but it’s still used for analytics purposes to a certain extent.
    • In terms of transactional data, ClickHouse’s columnar nature puts it below Postgres, but when it comes to analytical capabilities, even after tuning Postgres to its max potential, for, e.g., by using materialized views, indexing, cache size, buffers, etc. ClickHouse is ahead.  

    ClickHouse vs Apache Druid:

    • Apache Druid is an open-source data store that is primarily used for OLAP. Both Druid & ClickHouse are very similar in terms of their approaches and use cases but differ in terms of their architecture. Druid is mainly used for real-time analytics with heavy ingestions and high uptime.
    • Unlike Druid, ClickHouse has a much simpler deployment. CH can be deployed on only one server, while Druid setup needs multiple types of nodes (master, broker, ingestion, etc.). ClickHouse, with its support for SQL-like nature, provides better flexibility. It is more performant when the deployment is small.

    To summarize the differences between ClickHouse and other data warehouses:

    ClickHouse Engines

    Depending on the type of your table (internal or external) ClickHouse provides an array of engines that help us connect to different data storages and also determine the way data is stored, accessed, and other interactions on it.

    These engines are mainly categorized into two types:

    Database Engines:

    These allow us to work with different databases & tables.
    ClickHouse uses the Atomic database engine to provide configurable table engines and dialects. The popular ones are PostgreSQL, MySQL, and so on.

    Table Engines:

    These determine 

    • how and where data is stored
    • where to read/write it from/to
    • which queries it supports
    • use of indexes
    • concurrent data access and so on.

    These engines are further classified into families based on the above parameters:

    MergeTree Engines:

    This is the most universal and functional table for high-load tasks. The engines of this family support quick data insertion with subsequent background data processing. These engines also support data replication, partitioning, secondary data-skipping indexes and some other features. Following are some of the popular engines in this family:

    • MergeTree
    • SummingMergeTree
    • AggregatingMergeTree

    MergeTree engines with indexing and partitioning support allow data to be processed at a tremendous speed. These can also be leveraged to form materialized views that store aggregated data further improving the performance.

    Log Engines:

    These are lightweight engines with minimum functionality. These work the best when the requirement is to quickly write into many small tables and read them later as a whole. This family consists of:

    • Log
    • StripeLog
    • TinyLog

    These engines append data to the disk in a sequential fashion and support concurrent reading. They do not support indexing, updating, or deleting and hence are only useful when the data is small, sequential, and immutable.

    Integration Engines:

    These are used for communicating with other data storage and processing systems. This support:

    • JDBC
    • MongoDB
    • HDFS
    • S3
    • Kafka and so on.

    Using these engines we can import and export data from external sources. With engines like Kafka we can ingest data directly from a topic to a table in ClickHouse and with the S3 engine, we work directly with S3 objects.

    Special Engines:

    ClickHouse offers some special engines that are specific to the use case. For example:

    • MaterializedView
    • Distributed
    • Merge
    • File and so on.

    These special engines have their own quirks for eg. with File we can export data to a file, update data in the table by updating the file, etc.

    Summary

    We learned that ClickHouse is a very powerful and versatile tool. One that has stellar performance is feature-packed, very cost-efficient, and open-source. We saw a high-level comparison of ClickHouse with some of the best choices in an array of use cases. Although it ultimately comes down to how specific and intense your use case is, ClickHouse and its generic nature measure up pretty well on multiple occasions.

    ClickHouse’s applicability in web analytics, network management, log analysis, time series analysis, asset valuation in financial markets, and security threat identification makes it tremendously versatile. With consistently solving business problems in a low latency response for petabytes of data, ClickHouse is indeed one of the faster data warehouses out there.

    Further Readings

  • Getting Started With Kubernetes Operators (Helm Based) – Part 1

    Introduction

    The concept of operators was introduced by CoreOs in the last quarter of  2016 and post the introduction of operator framework last year, operators are rapidly becoming the standard way of managing applications on Kubernetes especially the ones which are stateful in nature. In this blog post, we will learn what an operator is. Why they are needed and what problems do they solve. We will also create a helm based operator as an example.

    This is the first part of our Kubernetes Operator Series. In the second part, getting started with Kubernetes operators (Ansible based), and the third part, getting started with Kubernetes operators (Golang based), you can learn how to build Ansible and Golang based operators.

    What is an Operator?

    Whenever we deploy our application on Kubernetes we leverage multiple Kubernetes objects like deployment, service, role, ingress, config map, etc. As our application gets complex and our requirements become non-generic, managing our application only with the help of native Kubernetes objects becomes difficult and we often need to introduce manual intervention or some other form of automation to make up for it.

    Operators solve this problem by making our application first class Kubernetes objects that is we no longer deploy our application as a set of native Kubernetes objects but a custom object/resource of its kind, having a more domain-specific schema and then we bake the “operational intelligence” or the “domain-specific knowledge” into the controller responsible for maintaining the desired state of this object. For example, etcd operator has made etcd-cluster a first class object and for deploying the cluster we create an object of Etcd Cluster kind. With operators, we are able to extend Kubernetes functionalities for custom use cases and manage our applications in a Kubernetes specific way allowing us to leverage Kubernetes APIs and Kubectl tooling.

    Operators combine crds and custom controllers and intend to eliminate the requirement for manual intervention (human operator) while performing tasks like an upgrade, handling failure recovery, scaling in case of complex (often stateful) applications and make them more resilient and self-sufficient.

    How to Build Operators ?

    For building and managing operators we mostly leverage the Operator Framework which is an open source tool kit allowing us to build operators in a highly automated, scalable and effective way.  Operator framework comprises of three subcomponents:

    1. Operator SDK: Operator SDK is the most important component of the operator framework. It allows us to bootstrap our operator project in minutes. It exposes higher level APIs and abstraction and saves developers the time to dig deeper into kubernetes APIs and focus more on building the operational logic. It performs common tasks like getting the controller to watch the custom resource (cr) for changes etc as part of the project setup process.
    2. Operator Lifecycle Manager:  Operators also run on the same kubernetes clusters in which they manage applications and more often than not we create multiple operators for multiple applications. Operator lifecycle manager (OLM) provides us a declarative way to install, upgrade and manage all the operators and their dependencies in our cluster.
    3. Operator Metering:  Operator metering is currently an alpha project. It records historical cluster usage and can generate usage reports showing usage breakdown by pod or namespace over arbitrary time periods.

    Types of Operators

    Currently there are three different types of operator we can build:

    1. Helm based operators: Helm based operators allow us to use our existing Helm charts and build operators using them. Helm based operators are quite easy to build and are preferred to deploy a stateless application using operator pattern.
    2. Ansible based Operator: Ansible based operator allows us to use our existing ansible playbooks and roles and build operators using them. There are also easy to build and generally preferred for stateless applications.
    3. Go based operators: Go based operators are built to solve the most complex use cases and are generally preferred for stateful applications. In case of an golang based operator, we build the controller logic ourselves providing it with all our custom requirements. This type of operators is also relatively complex to build.

    Building a Helm based operator

    1. Let’s first install the operator sdk

    go get -d github.com/operator-framework/operator-sdk
    cd $GOPATH/src/github.com/operator-framework/operator-sdk
    git checkout master
    make dep
    make install

    Now we will have the operator-sdk binary in the $GOPATH/bin folder.      

    2.  Setup the project

    For building a helm based operator we can use an existing Helm chart. We will be using the book-store Helm chart which deploys a simple python app and mongodb instances. This app allows us to perform crud operations via. rest endpoints.

    Now we will use the operator-sdk to create our Helm based bookstore-operator project.

    operator-sdk new bookstore-operator --api-version=velotio.com/v1alpha1 --kind=BookStore --type=helm --helm-chart=book-store
      --helm-chart-repo=https://akash-gautam.github.io/helmcharts/

    In the above command, the bookstore-operator is the name of our operator/project. –kind is used to specify the kind of objects this operator will watch and –api-verison is used for versioning of this object. The operator sdk takes only this much information and creates the custom resource definition (crd) and also the custom resource (cr) of its type for us (remember we talked about high-level abstraction operator sdk provides). The above command bootstraps a project with below folder structure.

    bookstore-operator/
    |
    |- build/ # Contains the Dockerfile to build the operator image
    |- deploy/ # Contains the crd,cr and manifest files for deploying operator
    |- helm-charts/ # Contains the helm chart we used while creating the project
    |- watches.yaml # Specifies the resource the operator watches (maintains the state of)

    We had discussed the operator-sdk automates setting up the operator projects and that is exactly what we can observe here. Under the build folder, we have the Dockerfile to build our operator image. Under deploy folder we have a crd folder containing both the crd and the cr. This folder also has operator.yaml file using which we will run the operator in our cluster, along with this we have manifest files for role, rolebinding and service account file to be used while deploying the operator.  We have our book-store helm chart under helm-charts. In the watches.yaml file.

    ---
    - version: v1alpha1
      group: velotio.com
      kind: BookStore
      chart: /opt/helm/helm-charts/book-store

    We can see that the bookstore-operator watches events related to BookStore kind objects and executes the helm chart specified.

    If we take a look at the cr file under deploy/crds (velotio_v1alpha1_bookstore_cr.yaml) folder then we can see that it looks just like the values.yaml file of our book-store helm chart.

    apiVersion: velotio.com/v1alpha1
    kind: BookStore
    metadata:
      name: example-bookstore
    spec:
      # Default values copied from <project_dir>/helm-charts/book-store/values.yaml
      
      # Default values for book-store.
      # This is a YAML-formatted file.
      # Declare variables to be passed into your templates.
      
      replicaCount: 1
      
      image:
        app:
          repository: akash125/pyapp
          tag: latest
          pullPolicy: IfNotPresent
        mongodb:
          repository: mongo
          tag: latest
          pullPolicy: IfNotPresent
          
      service:
        app:
          type: LoadBalancer
          port: 80
          targetPort: 3000
        mongodb:
          type: ClusterIP
          port: 27017
          targetPort: 27017
      
      
      resources: {}
        # We usually recommend not to specify default resources and to leave this as a conscious
        # choice for the user. This also increases chances charts run on environments with little
        # resources, such as Minikube. If you do want to specify resources, uncomment the following
        # lines, adjust them as necessary, and remove the curly braces after 'resources:'.
        # limits:
        #  cpu: 100m
        #  memory: 128Mi
        # requests:
        #  cpu: 100m
        #  memory: 128Mi
      
      nodeSelector: {}
      
      tolerations: []
      
      affinity: {}

    In the case of Helm charts, we use the values.yaml file to pass the parameter to our Helm releases, Helm based operator converts all these configurable parameters into the spec of our custom resource. This allows us to express the values.yaml with a custom resource (CR) which, as a native Kubernetes object, enables the benefits of RBAC applied to it and an audit trail. Now when we want to update out deployed we can simply modify the CR and apply it, and the operator will ensure that the changes we made are reflected in our app.

    For each object of  `BookStore` kind  the bookstore-operator will perform the following actions:

    1. Create the bookstore app deployment if it doesn’t exists.
    2. Create the bookstore app service if it doesn’t exists.
    3. Create the mongodb deployment if it doesn’t exists.
    4. Create the mongodb service if it doesn’t exists.
    5. Ensure deployments and services match their desired configurations like the replica count, image tag, service port etc.  

    3. Build the Bookstore-operator Image

    The Dockerfile for building the operator image is already in our build folder we need to run the below command from the root folder of our operator project to build the image.

    operator-sdk build akash125/bookstore-operator:v0.0.1

    4. Run the Bookstore-operator

    As we have our operator image ready we can now go ahead and run it. The deployment file (operator.yaml under deploy folder) for the operator was created as a part of our project setup we just need to set the image for this deployment to the one we built in the previous step.

    After updating the image in the operator.yaml we are ready to deploy the operator.

    kubectl create -f deploy/service_account.yaml
    kubectl create -f deploy/role.yaml
    kubectl create -f deploy/role_binding.yaml
    kubectl create -f deploy/operator.yaml

    Note: The role created might have more permissions then actually required for the operator so it is always a good idea to review it and trim down the permissions in production setups.

    Verify that the operator pod is in running state.

    5. Deploy the Bookstore App

    Now we have the bookstore-operator running in our cluster we just need to create the custom resource for deploying our bookstore app.

    First, we can create bookstore cr we need to register its crd.

    kubectl apply -f deploy/crds/velotio_v1alpha1_bookstore_crd.yaml

    Now we can create the bookstore object.

    kubectl apply -f deploy/crds/velotio_v1alpha1_bookstore_cr.yaml

    Now we can see that our operator has deployed out book-store app.

    Now let’s grab the external IP of the app and make some requests to store details of books.

    Let’s hit the external IP on the browser and see if it lists the books we just stored:

    The bookstore operator build is available here.

    Conclusion

    Since its early days Kubernetes was believed to be a great tool for managing stateless application but the managing stateful applications on Kubernetes was always considered difficult. Operators are a big leap towards managing stateful applications and other complex distributed, multi (poly) cloud workloads with the same ease that we manage the stateless applications. In this blog post, we learned the basics of Kubernetes operators and build a simple helm based operator. In the next installment of this blog series, we will build an Ansible based Kubernetes operator and then in the last blog we will build a full-fledged Golang based operator for managing stateful workloads.

    Related Reads:

  • How to Make Your Terminal More Productive with Z-Shell (ZSH)

    When working with servers or command-line-based applications, we spend most of our time on the command line. A good-looking and productive terminal is better in many aspects than a GUI (Graphical User Interface) environment since the command line takes less time for most use cases. Today, we’ll look at some of the features that make a terminal cool and productive.

    You can use the following steps on Ubuntu 20.04. if you are using a different operating system, your commands will likely differ. If you’re using Windows, you can choose between Cygwin, WSL, and Git Bash.

    Prerequisites

    Let’s upgrade the system and install some basic tools needed.

    sudo apt update && sudo apt upgrade
    sudo apt install build-essential curl wget git

    Z-Shell (ZSH)

    Zsh is an extended Bourne shell with many improvements, including some features of Bash and other shells.

    Let’s install Z-Shell:

    sudo apt install zsh

    Make it our default shell for our terminal:

    chsh -s $(which zsh)

    Now restart the system and open the terminal again to be welcomed by ZSH. Unlike other shells like Bash, ZSH requires some initial configuration, so it asks for some configuration options the first time we start it and saves them in a file called .zshrc in the home directory (/home/user) where the user is the current system user.

    For now, we’ll skip the manual work and get a head start with the default configuration. Press 2, and ZSH will populate the .zshrc file with some default options. We can change these later.  

    The initial configuration setup can be run again as shown in the below image

    Oh-My-ZSH

    Oh-My-ZSH is a community-driven, open-source framework to manage your ZSH configuration. It comes with many plugins and helpers. It can be installed with one single command as below.

    Installation

    sh -c "$(wget https://raw.github.com/ohmyzsh/ohmyzsh/master/tools/install.sh -O -)"

    It’d take a backup of our existing .zshrc in a file zshrc.pre-oh-my-zsh, so whenever you uninstall it, the backup would be restored automatically.

    Font

    A good terminal needs some good fonts, we’d use Terminess nerd font to make our terminal look awesome, which can be downloaded here. Once downloaded, extract and move them to ~/.local/share/fonts to make them available for the current user or to /usr/share/fonts to be available for all the users.

    tar -xvf Terminess.zip
    mv *.ttf ~/.local/share/fonts 

    Once the font is installed, it will look like:

    Among all the things Oh-My-ZSH provides, 2 things are community favorites, plugins, and themes.

    Theme

    My go-to ZSH theme is powerlevel10k because it’s flexible, provides everything out of the box, and is easy to install with one command as shown below:

    git clone --depth=1 https://github.com/romkatv/powerlevel10k.git ${ZSH_CUSTOM:-$HOME/.oh-my-zsh/custom}/themes/powerlevel10k

    To set this theme in .zshrc:

    Close the terminal and start it again. Powerlevel10k will welcome you with the initial setup, go through the setup with the options you want. You can run this setup again by executing the below command:

    p10k configure

    Tools and plugins we can’t live without

    Plugins can be added to the plugins array in the .zshrc file. For all the plugins you want to use from the below list, add those to the plugins array in the .zshrc file like so:

    ZSH-Syntax-Highlighting

    This enables the highlighting of commands as you type and helps you catch syntax errors before you execute them:

    As you can see, “ls” is in green but “lss” is in red.

    Execute below command to install it:

    git clone https://github.com/zsh-users/zsh-syntax-highlighting.git ${ZSH_CUSTOM:-~/.oh-my-zsh/custom}/plugins/zsh-syntax-highlighting

    ZSH Autosuggestions

    This suggests commands as you type based on your history:

    The below command is how you can install it by cloning the git repo:

    git clone https://github.com/zsh-users/zsh-autosuggestions ${ZSH_CUSTOM:-~/.oh-my-zsh/custom}/plugins/zsh-autosuggestions

    ZSH Completions

    For some extra ZSH completion scripts, execute below command

    git clone https://github.com/zsh-users/zsh-completions ${ZSH_CUSTOM:=~/.oh-my-zsh/custom}/plugins/zsh-completions 

    autojump

    It’s a faster way of navigating the file system; it works by maintaining a database of directories you visit the most. More details can be found here.

    sudo apt install autojump 

    You can also use the plugin Z as an alternative if you’re not able to install autojump or for any other reason.

    Internal Plugins

    Some plugins come installed with oh-my-zsh, and they can be included directly in .zshrc file without any installation.

    copyfile

    It copies the content of a file to the clipboard.

    copyfile test.txt

    copypath

    It copies the absolute path of the current directory to the clipboard.

    copybuffer

    This plugin copies the command that is currently typed in the command prompt to the clipboard. It works with the keyboard shortcut CTRL + o.

    sudo

    Sometimes, we forget to prefix a command with sudo, but that can be done in just a second with this plugin. When you hit the ESC key twice, it will prefix the command you’ve typed in the terminal with sudo.

    web-search

    This adds some aliases for searching with Google, Wikipedia, etc. For example, if you want to web-search with Google, you can execute the below command:

    google oh my zsh

    Doing so will open this search in Google:

    More details can be found here.

    Remember, you’d have to add each of these plugins in the .zshrc file as well. So, in the end, this is how the plugins array in .zshrc file should look like:

    plugins=(
            zsh-autosuggestions
            zsh-syntax-highlighting
            zsh-completions
            autojump
            copyfile
            copydir
            copybuffer
            history
            dirhistory
            sudo
            web-search
            git
    ) 

    You can add more plugins, like docker, heroku, kubectl, npm, jsontools, etc., if you’re a developer. There are plugins for system admins as well or for anything else you need. You can explore them here.

    Enhancd

    Enhancd is the next-gen method to navigate file system with cli. It works with a fuzzy finder, we’ll install it fzf for this purpose.

    sudo apt install fzf

    Enhancd can be installed with the zplug plugin manager for ZSH, so first we’ll install zplug with the below command:

    $ curl -sL --proto-redir -all,https https://raw.githubusercontent.com/zplug/installer/master/installer.zsh | zsh

    Append the following to .zshrc:

    source ~/.zplug/init.zsh
    zplug load

    Now close your terminal, open it again, and use zplug to install enhanced

    zplug "b4b4r07/enhancd", use:init.sh

    Aliases

    As a developer, I need to execute git commands many times a day, typing each command every time is too cumbersome, so we can use aliases for them. Aliases need to be added .zshrc, and here’s how we can add them.

    alias gs='git status'
    alias ga='git add .'
    alias gf='git fetch'
    alias gr='git rebase'
    alias gp='git push'
    alias gd='git diff'
    alias gc='git commit'
    alias gh='git checkout'
    alias gst='git stash'
    alias gl='git log --oneline --graph'

    You can add these anywhere in the .zshrc file.

    Colorls

    Another tool that makes you say wow is Colorls. This tool colorizes the output of the ls command. This is how it looks once you install it:

    It works with ruby, below is how you can install both ruby and colors:

    sudo apt install ruby ruby-dev ruby-colorize
    sudo gem install colorls

    Now, restart your terminal and execute the command colors in your terminal to see the magic!

    Bonus – We can add some aliases as well if we want the same output of Colorls when we execute the command ls. Note that we’re adding another alias for ls to make it available as well.

    alias cl='ls'
    alias ls='colorls'
    alias la='colorls -a'
    alias ll='colorls -l'
    alias lla='colorls -la'

    These are the tools and plugins I can’t live without now, Let me know if I’ve missed anything.

    Automation

    Do you wanna repeat this process again, if let’s say, you’ve bought a new laptop and want the same setup?

    You can automate all of this if your answer is no, and that’s why I’ve created Project Automator. This project does a lot more than just setting up a terminal: it works with Arch Linux as of now but you can take the parts you need and make it work with almost any *nix system you like.

    Explaining how it works is beyond the scope of this article, so I’ll have to leave you guys here to explore it on your own.

    Conclusion

    We need to perform many tasks on our systems, and using a GUI(Graphical User Interface) tool for a task can consume a lot of your time, especially if you repeat the same task on a daily basis like converting a media stream, setting up tools on a system, etc.

    Using a command-line tool can save you a lot of time and you can automate repetitive tasks with scripting. It can be a great tool for your arsenal.

  • Deploy Serverless, Event-driven Python Applications Using Zappa

    Introduction

    Zappa is a  very powerful open source python project which lets you build, deploy and update your WSGI app hosted on AWS Lambda + API Gateway easily.This blog is a detailed step-by-step focusing on challenges faced while deploying Django application on AWS Lambda using Zappa as a deployment tool.

    Building Your Application

    If you do not have a Django application already you can build one by cloning this GitHub repository.

    $ git clone https://github.com/velotiotech/django-zappa-sample.git    

    Cloning into 'django-zappa-sample'...
    remote: Counting objects: 18, done.
    remote: Compressing objects: 100% (13/13), done.
    remote: Total 18 (delta 1), reused 15 (delta 1), pack-reused 0
    Unpacking objects: 100% (18/18), done.
    Checking connectivity... done.

    Once you have cloned the repository you will need a virtual environment which provides an isolated Python environment for your application. I prefer virtualenvwrapper to create one.

    Command :

    $ mkvirtualenv django_zappa_sample 

    Installing setuptools, pip, wheel...done.
    virtualenvwrapper.user_scripts creating /home/velotio/Envs/django_zappa_sample/bin/predeactivate
    virtualenvwrapper.user_scripts creating /home/velotio/Envs/django_zappa_sample/bin/postdeactivate
    virtualenvwrapper.user_scripts creating /home/velotio/Envs/django_zappa_sample/bin/preactivate
    virtualenvwrapper.user_scripts creating /home/velotio/Envs/django_zappa_sample/bin/postactivate
    virtualenvwrapper.user_scripts creating /home/velotio/Envs/django_zappa_sample/bin/get_env_details

    Install dependencies from requirements.txt.

    $ pip install -r requirements.txt

    Collecting Django==1.11.11 (from -r requirements.txt (line 1))
      Downloading https://files.pythonhosted.org/packages/d5/bf/2cd5eb314aa2b89855c01259c94dc48dbd9be6c269370c1f7ae4979e6e2f/Django-1.11.11-py2.py3-none-any.whl (6.9MB)
        100% |████████████████████████████████| 7.0MB 772kB/s 
    Collecting zappa==0.45.1 (from -r requirements.txt (line 2))
    Collecting pytz (from Django==1.11.11->-r requirements.txt (line 1))
      Downloading https://files.pythonhosted.org/packages/dc/83/15f7833b70d3e067ca91467ca245bae0f6fe56ddc7451aa0dc5606b120f2/pytz-2018.4-py2.py3-none-any.whl (510kB)
        100% |████████████████████████████████| 512kB 857kB/s 
    Collecting future==0.16.0 (from zappa==0.45.1->-r requirements.txt (line 2))
    Collecting toml>=0.9.3 (from zappa==0.45.1->-r requirements.txt (line 2))
    Collecting docutils>=0.12 (from zappa==0.45.1->-r requirements.txt (line 2))
      Using cached https://files.pythonhosted.org/packages/50/09/c53398e0005b11f7ffb27b7aa720c617aba53be4fb4f4f3f06b9b5c60f28/docutils-0.14-py2-none-any.whl
    Collecting PyYAML==3.12 (from zappa==0.45.1->-r requirements.txt (line 2))
    Collecting futures==3.1.1 (from zappa==0.45.1->-r requirements.txt (line 2))
      Using cached https://files.pythonhosted.org/packages/a6/1c/72a18c8c7502ee1b38a604a5c5243aa8c2a64f4bba4e6631b1b8972235dd/futures-3.1.1-py2-none-any.whl
    Requirement already satisfied: wheel>=0.30.0 in /home/velotio/Envs/django_zappa_sample/lib/python2.7/site-packages (from zappa==0.45.1->-r requirements.txt (line 2)) (0.31.1)
    Collecting base58==0.2.4 (from zappa==0.45.1->-r requirements.txt (line 2))
    Collecting durationpy==0.5 (from zappa==0.45.1->-r requirements.txt (line 2))
    Collecting kappa==0.6.0 (from zappa==0.45.1->-r requirements.txt (line 2))
      Using cached https://files.pythonhosted.org/packages/ed/cf/a8aa5964557c8a4828da23d210f8827f9ff190318838b382a4fb6f118f5d/kappa-0.6.0-py2-none-any.whl
    Collecting Werkzeug==0.12 (from zappa==0.45.1->-r requirements.txt (line 2))
      Using cached https://files.pythonhosted.org/packages/ae/c3/f59f6ade89c811143272161aae8a7898735e7439b9e182d03d141de4804f/Werkzeug-0.12-py2.py3-none-any.whl
    Collecting boto3>=1.4.7 (from zappa==0.45.1->-r requirements.txt (line 2))
      Downloading https://files.pythonhosted.org/packages/cd/a3/4d1caf76d8f5aac8ab1ffb4924ecf0a43df1572f6f9a13465a482f94e61c/boto3-1.7.24-py2.py3-none-any.whl (128kB)
        100% |████████████████████████████████| 133kB 1.1MB/s 
    Collecting six>=1.11.0 (from zappa==0.45.1->-r requirements.txt (line 2))
      Using cached https://files.pythonhosted.org/packages/67/4b/141a581104b1f6397bfa78ac9d43d8ad29a7ca43ea90a2d863fe3056e86a/six-1.11.0-py2.py3-none-any.whl
    Collecting tqdm==4.19.1 (from zappa==0.45.1->-r requirements.txt (line 2))
      Using cached https://files.pythonhosted.org/packages/c0/d3/7f930cbfcafae3836be39dd3ed9b77e5bb177bdcf587a80b6cd1c7b85e74/tqdm-4.19.1-py2.py3-none-any.whl
    Collecting argcomplete==1.9.2 (from zappa==0.45.1->-r requirements.txt (line 2))
      Using cached https://files.pythonhosted.org/packages/0f/ee/625763d848016115695942dba31a9937679a25622b6f529a2607d51bfbaa/argcomplete-1.9.2-py2.py3-none-any.whl
    Collecting hjson==3.0.1 (from zappa==0.45.1->-r requirements.txt (line 2))
    Collecting troposphere>=1.9.0 (from zappa==0.45.1->-r requirements.txt (line 2))
    Collecting python-dateutil==2.6.1 (from zappa==0.45.1->-r requirements.txt (line 2))
      Using cached https://files.pythonhosted.org/packages/4b/0d/7ed381ab4fe80b8ebf34411d14f253e1cf3e56e2820ffa1d8844b23859a2/python_dateutil-2.6.1-py2.py3-none-any.whl
    Collecting botocore>=1.7.19 (from zappa==0.45.1->-r requirements.txt (line 2))
      Downloading https://files.pythonhosted.org/packages/65/98/12aa979ca3215d69111026405a9812d7bb0c9ae49e2800b00d3bd794705b/botocore-1.10.24-py2.py3-none-any.whl (4.2MB)
        100% |████████████████████████████████| 4.2MB 768kB/s 
    Collecting requests>=2.10.0 (from zappa==0.45.1->-r requirements.txt (line 2))
      Using cached https://files.pythonhosted.org/packages/49/df/50aa1999ab9bde74656c2919d9c0c085fd2b3775fd3eca826012bef76d8c/requests-2.18.4-py2.py3-none-any.whl
    Collecting jmespath==0.9.3 (from zappa==0.45.1->-r requirements.txt (line 2))
      Using cached https://files.pythonhosted.org/packages/b7/31/05c8d001f7f87f0f07289a5fc0fc3832e9a57f2dbd4d3b0fee70e0d51365/jmespath-0.9.3-py2.py3-none-any.whl
    Collecting wsgi-request-logger==0.4.6 (from zappa==0.45.1->-r requirements.txt (line 2))
    Collecting lambda-packages==0.19.0 (from zappa==0.45.1->-r requirements.txt (line 2))
    Collecting python-slugify==1.2.4 (from zappa==0.45.1->-r requirements.txt (line 2))
      Using cached https://files.pythonhosted.org/packages/9f/77/ab7134b731d0e831cf82861c1ab0bb318e80c41155fa9da18958f9d96057/python_slugify-1.2.4-py2.py3-none-any.whl
    Collecting placebo>=0.8.1 (from kappa==0.6.0->zappa==0.45.1->-r requirements.txt (line 2))
    Collecting click>=5.1 (from kappa==0.6.0->zappa==0.45.1->-r requirements.txt (line 2))
      Using cached https://files.pythonhosted.org/packages/34/c1/8806f99713ddb993c5366c362b2f908f18269f8d792aff1abfd700775a77/click-6.7-py2.py3-none-any.whl
    Collecting s3transfer<0.2.0,>=0.1.10 (from boto3>=1.4.7->zappa==0.45.1->-r requirements.txt (line 2))
      Using cached https://files.pythonhosted.org/packages/d7/14/2a0004d487464d120c9fb85313a75cd3d71a7506955be458eebfe19a6b1d/s3transfer-0.1.13-py2.py3-none-any.whl
    Collecting cfn-flip>=0.2.5 (from troposphere>=1.9.0->zappa==0.45.1->-r requirements.txt (line 2))
    Collecting certifi>=2017.4.17 (from requests>=2.10.0->zappa==0.45.1->-r requirements.txt (line 2))
      Using cached https://files.pythonhosted.org/packages/7c/e6/92ad559b7192d846975fc916b65f667c7b8c3a32bea7372340bfe9a15fa5/certifi-2018.4.16-py2.py3-none-any.whl
    Collecting chardet<3.1.0,>=3.0.2 (from requests>=2.10.0->zappa==0.45.1->-r requirements.txt (line 2))
      Using cached https://files.pythonhosted.org/packages/bc/a9/01ffebfb562e4274b6487b4bb1ddec7ca55ec7510b22e4c51f14098443b8/chardet-3.0.4-py2.py3-none-any.whl
    Collecting idna<2.7,>=2.5 (from requests>=2.10.0->zappa==0.45.1->-r requirements.txt (line 2))
      Using cached https://files.pythonhosted.org/packages/27/cc/6dd9a3869f15c2edfab863b992838277279ce92663d334df9ecf5106f5c6/idna-2.6-py2.py3-none-any.whl
    Collecting urllib3<1.23,>=1.21.1 (from requests>=2.10.0->zappa==0.45.1->-r requirements.txt (line 2))
      Using cached https://files.pythonhosted.org/packages/63/cb/6965947c13a94236f6d4b8223e21beb4d576dc72e8130bd7880f600839b8/urllib3-1.22-py2.py3-none-any.whl
    Collecting Unidecode>=0.04.16 (from python-slugify==1.2.4->zappa==0.45.1->-r requirements.txt (line 2))
      Using cached https://files.pythonhosted.org/packages/59/ef/67085e30e8bbcdd76e2f0a4ad8151c13a2c5bce77c85f8cad6e1f16fb141/Unidecode-1.0.22-py2.py3-none-any.whl
    Installing collected packages: pytz, Django, future, toml, docutils, PyYAML, futures, base58, durationpy, jmespath, six, python-dateutil, botocore, s3transfer, boto3, placebo, click, kappa, Werkzeug, tqdm, argcomplete, hjson, cfn-flip, troposphere, certifi, chardet, idna, urllib3, requests, wsgi-request-logger, lambda-packages, Unidecode, python-slugify, zappa
    Successfully installed Django-1.11.11 PyYAML-3.12 Unidecode-1.0.22 Werkzeug-0.12 argcomplete-1.9.2 base58-0.2.4 boto3-1.7.24 botocore-1.10.24 certifi-2018.4.16 cfn-flip-1.0.3 chardet-3.0.4 click-6.7 docutils-0.14 durationpy-0.5 future-0.16.0 futures-3.1.1 hjson-3.0.1 idna-2.6 jmespath-0.9.3 kappa-0.6.0 lambda-packages-0.19.0 placebo-0.8.1 python-dateutil-2.6.1 python-slugify-1.2.4 pytz-2018.4 requests-2.18.4 s3transfer-0.1.13 six-1.11.0 toml-0.9.4 tqdm-4.19.1 troposphere-2.2.1 urllib3-1.22 wsgi-request-logger-0.4.6 zappa-0.45.1
    @velotiotech

    Now if you run the server directly it will log a warning as the database is not set up yet.

    $ python manage.py runserver  

    Performing system checks...
    
    System check identified no issues (0 silenced).
    
    You have 13 unapplied migration(s). Your project may not work properly until you apply the migrations for app(s): admin, auth, contenttypes, sessions.
    Run 'python manage.py migrate' to apply them.
    
    May 20, 2018 - 14:47:32
    Django version 1.11.11, using settings 'django_zappa_sample.settings'
    Starting development server at http://127.0.0.1:8000/
    Quit the server with CONTROL-C.

    Also trying to access admin page (http://localhost:8000/admin/) will throw an “OperationalError” exception with below log at server end.

    Internal Server Error: /admin/
    Traceback (most recent call last):
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/core/handlers/exception.py", line 41, in inner
        response = get_response(request)
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/core/handlers/base.py", line 187, in _get_response
        response = self.process_exception_by_middleware(e, request)
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/core/handlers/base.py", line 185, in _get_response
        response = wrapped_callback(request, *callback_args, **callback_kwargs)
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/contrib/admin/sites.py", line 242, in wrapper
        return self.admin_view(view, cacheable)(*args, **kwargs)
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/utils/decorators.py", line 149, in _wrapped_view
        response = view_func(request, *args, **kwargs)
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/views/decorators/cache.py", line 57, in _wrapped_view_func
        response = view_func(request, *args, **kwargs)
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/contrib/admin/sites.py", line 213, in inner
        if not self.has_permission(request):
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/contrib/admin/sites.py", line 187, in has_permission
        return request.user.is_active and request.user.is_staff
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/utils/functional.py", line 238, in inner
        self._setup()
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/utils/functional.py", line 386, in _setup
        self._wrapped = self._setupfunc()
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/contrib/auth/middleware.py", line 24, in <lambda>
        request.user = SimpleLazyObject(lambda: get_user(request))
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/contrib/auth/middleware.py", line 12, in get_user
        request._cached_user = auth.get_user(request)
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/contrib/auth/__init__.py", line 211, in get_user
        user_id = _get_user_session_key(request)
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/contrib/auth/__init__.py", line 61, in _get_user_session_key
        return get_user_model()._meta.pk.to_python(request.session[SESSION_KEY])
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/contrib/sessions/backends/base.py", line 57, in __getitem__
        return self._session[key]
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/contrib/sessions/backends/base.py", line 207, in _get_session
        self._session_cache = self.load()
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/contrib/sessions/backends/db.py", line 35, in load
        expire_date__gt=timezone.now()
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/db/models/manager.py", line 85, in manager_method
        return getattr(self.get_queryset(), name)(*args, **kwargs)
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/db/models/query.py", line 374, in get
        num = len(clone)
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/db/models/query.py", line 232, in __len__
        self._fetch_all()
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/db/models/query.py", line 1118, in _fetch_all
        self._result_cache = list(self._iterable_class(self))
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/db/models/query.py", line 53, in __iter__
        results = compiler.execute_sql(chunked_fetch=self.chunked_fetch)
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/db/models/sql/compiler.py", line 899, in execute_sql
        raise original_exception
    OperationalError: no such table: django_session
    [20/May/2018 14:59:23] "GET /admin/ HTTP/1.1" 500 153553
    Not Found: /favicon.ico

    In order to fix this you need to run the migration into your database so that essential tables like auth_user, sessions, etc are created before any request is made to the server.

    $ python manage.py migrate 

    Operations to perform:
      Apply all migrations: admin, auth, contenttypes, sessions
    Running migrations:
      Applying contenttypes.0001_initial... OK
      Applying auth.0001_initial... OK
      Applying admin.0001_initial... OK
      Applying admin.0002_logentry_remove_auto_add... OK
      Applying contenttypes.0002_remove_content_type_name... OK
      Applying auth.0002_alter_permission_name_max_length... OK
      Applying auth.0003_alter_user_email_max_length... OK
      Applying auth.0004_alter_user_username_opts... OK
      Applying auth.0005_alter_user_last_login_null... OK
      Applying auth.0006_require_contenttypes_0002... OK
      Applying auth.0007_alter_validators_add_error_messages... OK
      Applying auth.0008_alter_user_username_max_length... OK
      Applying sessions.0001_initial... OK

    NOTE: Use DATABASES from project settings file to configure your database that you would want your Django application to use once hosted on AWS Lambda. By default, its configured to create a local SQLite database file as backend.

    You can run the server again and it should now load the admin panel of your website.

    Do verify if you have the zappa python package into your virtual environment before moving forward.

    Configuring Zappa Settings

    Deploying with Zappa is simple as it only needs a configuration file to run and rest will be managed by Zappa. To create this configuration file run from your project root directory –

    $ zappa init 

    ███████╗ █████╗ ██████╗ ██████╗  █████╗
    ╚══███╔╝██╔══██╗██╔══██╗██╔══██╗██╔══██╗
      ███╔╝ ███████║██████╔╝██████╔╝███████║
     ███╔╝  ██╔══██║██╔═══╝ ██╔═══╝ ██╔══██║
    ███████╗██║  ██║██║     ██║     ██║  ██║
    ╚══════╝╚═╝  ╚═╝╚═╝     ╚═╝     ╚═╝  ╚═╝
    
    Welcome to Zappa!
    
    Zappa is a system for running server-less Python web applications on AWS Lambda and AWS API Gateway.
    This `init` command will help you create and configure your new Zappa deployment.
    Let's get started!
    
    Your Zappa configuration can support multiple production stages, like 'dev', 'staging', and 'production'.
    What do you want to call this environment (default 'dev'): 
    
    AWS Lambda and API Gateway are only available in certain regions. Let's check to make sure you have a profile set up in one that will work.
    We found the following profiles: default, and hdx. Which would you like us to use? (default 'default'): 
    
    Your Zappa deployments will need to be uploaded to a private S3 bucket.
    If you don't have a bucket yet, we'll create one for you too.
    What do you want call your bucket? (default 'zappa-108wqhyn4'): django-zappa-sample-bucket
    
    It looks like this is a Django application!
    What is the module path to your projects's Django settings?
    We discovered: django_zappa_sample.settings
    Where are your project's settings? (default 'django_zappa_sample.settings'): 
    
    You can optionally deploy to all available regions in order to provide fast global service.
    If you are using Zappa for the first time, you probably don't want to do this!
    Would you like to deploy this application globally? (default 'n') [y/n/(p)rimary]: n
    
    Okay, here's your zappa_settings.json:
    
    {
        "dev": {
            "aws_region": "us-east-1", 
            "django_settings": "django_zappa_sample.settings", 
            "profile_name": "default", 
            "project_name": "django-zappa-sa", 
            "runtime": "python2.7", 
            "s3_bucket": "django-zappa-sample-bucket"
        }
    }
    
    Does this look okay? (default 'y') [y/n]: y
    
    Done! Now you can deploy your Zappa application by executing:
    
    	$ zappa deploy dev
    
    After that, you can update your application code with:
    
    	$ zappa update dev
    
    To learn more, check out our project page on GitHub here: https://github.com/Miserlou/Zappa
    and stop by our Slack channel here: https://slack.zappa.io
    
    Enjoy!,
     ~ Team Zappa!

    You can verify zappa_settings.json generated at your project root directory.

    TIP: The virtual environment name should not be the same as the Zappa project name, as this may cause errors.

    Additionally, you could specify other settings in  zappa_settings.json file as per requirement using Advanced Settings.

    Now, you’re ready to deploy!

    IAM Permissions

    In order to deploy the Django Application to Lambda/Gateway, setup an IAM role (eg. ZappaLambdaExecutionRole) with the following permissions:

    {
    "Version": "2012-10-17",
    "Statement": [
    {
    "Effect": "Allow",
    "Action": [
    "iam:AttachRolePolicy",
    "iam:CreateRole",
    "iam:GetRole",
    "iam:PutRolePolicy"
    ],
    "Resource": [
    "*"
    ]
    },
    {
    "Effect": "Allow",
    "Action": [
    "iam:PassRole"
    ],
    "Resource": [
    "arn:aws:iam:::role/*-ZappaLambdaExecutionRole"
    ]
    },
    {
    "Effect": "Allow",
    "Action": [
    "apigateway:DELETE",
    "apigateway:GET",
    "apigateway:PATCH",
    "apigateway:POST",
    "apigateway:PUT",
    "events:DeleteRule",
    "events:DescribeRule",
    "events:ListRules",
    "events:ListTargetsByRule",
    "events:ListRuleNamesByTarget",
    "events:PutRule",
    "events:PutTargets",
    "events:RemoveTargets",
    "lambda:AddPermission",
    "lambda:CreateFunction",
    "lambda:DeleteFunction",
    "lambda:GetFunction",
    "lambda:GetPolicy",
    "lambda:ListVersionsByFunction",
    "lambda:RemovePermission",
    "lambda:UpdateFunctionCode",
    "lambda:UpdateFunctionConfiguration",
    "cloudformation:CreateStack",
    "cloudformation:DeleteStack",
    "cloudformation:DescribeStackResource",
    "cloudformation:DescribeStacks",
    "cloudformation:ListStackResources",
    "cloudformation:UpdateStack",
    "logs:DescribeLogStreams",
    "logs:FilterLogEvents",
    "route53:ListHostedZones",
    "route53:ChangeResourceRecordSets",
    "route53:GetHostedZone",
    "s3:CreateBucket",
    ],
    "Resource": [
    "*"
    ]
    },
    {
    "Effect": "Allow",
    "Action": [
    "s3:ListBucket"
    ],
    "Resource": [
    "arn:aws:s3:::"
    ]
    },
    {
    "Effect": "Allow",
    "Action": [
    "s3:DeleteObject",
    "s3:GetObject",
    "s3:PutObject",
    "s3:CreateMultipartUpload",
    "s3:AbortMultipartUpload",
    "s3:ListMultipartUploadParts",
    "s3:ListBucketMultipartUploads"
    ],
    "Resource": [
    "arn:aws:s3:::/*"
    ]
    }
    ]
    }

    Deploying Django Application

    Before deploying the application, ensure that the IAM role is set in the config JSON as follows:

    {
    "dev": {
    ...
    "manage_roles": false, // Disable Zappa client managing roles.
    "role_name": "MyLambdaRole", // Name of your Zappa execution role. Optional, default: --ZappaExecutionRole.
    "role_arn": "arn:aws:iam::12345:role/app-ZappaLambdaExecutionRole", // ARN of your Zappa execution role. Optional.
    ...
    },
    ...
    }

    Once your settings are configured, you can package and deploy your application to a stage called “dev” with a single command:

    $ zappa deploy dev

    Calling deploy for stage dev..
    Downloading and installing dependencies..
    Packaging project as zip.
    Uploading django-zappa-sa-dev-1526831069.zip (10.9MiB)..
    100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 11.4M/11.4M [01:02<00:00, 75.3KB/s]
    Scheduling..
    Scheduled django-zappa-sa-dev-zappa-keep-warm-handler.keep_warm_callback with expression rate(4 minutes)!
    Uploading django-zappa-sa-dev-template-1526831157.json (1.6KiB)..
    100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 1.60K/1.60K [00:02<00:00, 792B/s]
    Waiting for stack django-zappa-sa-dev to create (this can take a bit)..
    100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 4/4 [00:11<00:00,  2.92s/res]
    Deploying API Gateway..
    Deployment complete!: https://akg59b222b.execute-api.us-east-1.amazonaws.com/dev

    You should see that your Zappa deployment completed successfully with URL to API gateway created for your application.

    Troubleshooting

    1. If you are seeing the following error while deployment, it’s probably because you do not have sufficient privileges to run deployment on AWS Lambda. Ensure your IAM role has all the permissions as described above or set “manage_roles” to true so that Zappa can create and manage the IAM role for you.

    Calling deploy for stage dev..
    Creating django-zappa-sa-dev-ZappaLambdaExecutionRole IAM Role..
    Error: Failed to manage IAM roles!
    You may lack the necessary AWS permissions to automatically manage a Zappa execution role.
    To fix this, see here: https://github.com/Miserlou/Zappa#using-custom-aws-iam-roles-and-policies

    2. The below error will be caused as you have not listed “events.amazonaws.com” as Trusted Entity for your IAM Role. You can add the same or set “keep_warm” parameter to false in your Zappa settings file. Your Zappa deployment was partially deployed as it got terminated abnormally.

    Downloading and installing dependencies..
    100%|████████████████████████████████████████████| 44/44 [00:05<00:00, 7.92pkg/s]
    Packaging project as zip..
    Uploading django-zappa-sample-dev-1482817370.zip (8.8MiB)..
    100%|█████████████████████████████████████████| 9.22M/9.22M [00:17<00:00, 527KB/s]
    Scheduling...
    Oh no! An error occurred! :(
    
    ==============
    
    Traceback (most recent call last):
    Traceback (most recent call last):
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/cli.py", line 2610, in handle
        sys.exit(cli.handle())
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/cli.py", line 505, in handle
        self.dispatch_command(self.command, stage)
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/cli.py", line 539, in dispatch_command
        self.deploy(self.vargs['zip'])
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/cli.py", line 800, in deploy
        self.zappa.add_binary_support(api_id=api_id, cors=self.cors)
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/core.py", line 1490, in add_binary_support
        restApiId=api_id
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/botocore/client.py", line 314, in _api_call
        return self._make_api_call(operation_name, kwargs)
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/botocore/client.py", line 612, in _make_api_call
        raise error_class(parsed_response, operation_name)
    ClientError: An error occurred (ValidationError) when calling the PutRole operation: Provided role 'arn:aws:iam:484375727565:role/lambda_basic_execution' cannot be assumed by principal
    'events.amazonaws.com'.
    
    ==============
    
    Need help? Found a bug? Let us know! :D
    File bug reports on GitHub here: https://github.com/Miserlou/Zappa
    And join our Slack channel here: https://slack.zappa.io
    Love!,
    ~ Team Zappa!

    3. Adding the parameter and running zappa update will cause above error. As you can see it says “Stack django-zappa-sa-dev does not exists” as the previous deployment was unsuccessful. To fix this, delete the Lambda function from console and rerun the deployment.

    Downloading and installing dependencies..
    100%|████████████████████████████████████████████| 44/44 [00:05<00:00, 7.92pkg/s]
    Packaging project as zip..
    Uploading django-zappa-sample-dev-1482817370.zip (8.8MiB)..
    100%|█████████████████████████████████████████| 9.22M/9.22M [00:17<00:00, 527KB/s]
    Updating Lambda function code..
    Updating Lambda function configuration..
    Uploading djangoo-zapppa-sample-dev-template-1482817403.json (1.5KiB)..
    100%|████████████████████████████████████████| 1.56K/1.56K [00:00<00:00, 6.56KB/s]
    CloudFormation stack missing, re-deploy to enable updates
    ERROR:Could not get API ID.
    Traceback (most recent call last):
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/cli.py", line 2610, in handle
        sys.exit(cli.handle())
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/cli.py", line 505, in handle
        self.dispatch_command(self.command, stage)
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/cli.py", line 539, in dispatch_command
        self.deploy(self.vargs['zip'])
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/cli.py", line 800, in deploy
        self.zappa.add_binary_support(api_id=api_id, cors=self.cors)
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/core.py", line 1490, in add_binary_support
        restApiId=api_id
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/botocore/client.py", line 314, in _api_call
        return self._make_api_call(operation_name, kwargs)
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/botocore/client.py", line 612, in _make_api_call
        raise error_class(parsed_response, operation_name)
    ClientError: An error occurred (ValidationError) when calling the DescribeStackResource operation: Stack 'django-zappa-sa-dev' does not exist
    Deploying API Gateway..
    Oh no! An error occurred! :(
    
    ==============
    
    Traceback (most recent call last):
    File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/cli.py", line 1847, in handle
    sys.exit(cli.handle())
    File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/cli.py", line 345, in handle
    self.dispatch_command(self.command, environment)
    File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/cli.py", line 379, in dispatch_command
    self.update()
    File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/cli.py", line 605, in update
    endpoint_url = self.deploy_api_gateway(api_id)
    File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/cli.py", line 1816, in deploy_api_gateway
    cloudwatch_metrics_enabled=self.zappa_settings[self.api_stage].get('cloudwatch_metrics_enabled', False),
    File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/zappa.py", line 1014, in deploy_api_gateway
    variables=variables or {}
    File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/botocore/client.py", line 251, in _api_call
    return self._make_api_call(operation_name, kwargs)
    File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/botocore/client.py", line 513, in _make_api_call
    api_params, operation_model, context=request_context)
    File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/botocore/client.py", line 566, in _convert_to_request_dict
    api_params, operation_model)
    File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/botocore/validate.py", line 270, in serialize_to_request
    raise ParamValidationError(report=report.generate_report())
    ParamValidationError: Parameter validation failed:
    Invalid type for parameter restApiId, value: None, type: <type 'NoneType'>, valid types: <type 'basestring'>
    
    ==============
    
    Need help? Found a bug? Let us know! :D
    File bug reports on GitHub here: https://github.com/Miserlou/Zappa
    And join our Slack channel here: https://slack.zappa.io
    Love!,
    ~ Team Zappa!

    4.  If you run into any distribution error, please try down-grading your pip version to 9.0.1.

    $ pip install pip==9.0.1   

    Calling deploy for stage dev..
    Downloading and installing dependencies..
    Oh no! An error occurred! :(
    
    ==============
    
    Traceback (most recent call last):
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/cli.py", line 2610, in handle
        sys.exit(cli.handle())
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/cli.py", line 505, in handle
        self.dispatch_command(self.command, stage)
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/cli.py", line 539, in dispatch_command
        self.deploy(self.vargs['zip'])
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/cli.py", line 709, in deploy
        self.create_package()
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/cli.py", line 2171, in create_package
        disable_progress=self.disable_progress
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/core.py", line 595, in create_lambda_zip
        installed_packages = self.get_installed_packages(site_packages, site_packages_64)
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/core.py", line 751, in get_installed_packages
        pip.get_installed_distributions()
    AttributeError: 'module' object has no attribute 'get_installed_distributions'
    
    ==============
    
    Need help? Found a bug? Let us know! :D
    File bug reports on GitHub here: https://github.com/Miserlou/Zappa
    And join our Slack channel here: https://slack.zappa.io
    Love!,
     ~ Team Zappa!

    or,

    If you run into NotFoundException(Invalid REST API Identifier issue) please try undeploying the Zappa stage and retry again.

    Calling deploy for stage dev..
    Downloading and installing dependencies..
    Packaging project as zip.
    Uploading django-zappa-sa-dev-1526830532.zip (10.9MiB)..
    100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 11.4M/11.4M [00:42<00:00, 331KB/s]
    Scheduling..
    Scheduled django-zappa-sa-dev-zappa-keep-warm-handler.keep_warm_callback with expression rate(4 minutes)!
    Uploading django-zappa-sa-dev-template-1526830690.json (1.6KiB)..
    100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 1.60K/1.60K [00:01<00:00, 801B/s]
    Oh no! An error occurred! :(
    
    ==============
    
    Traceback (most recent call last):
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/cli.py", line 2610, in handle
        sys.exit(cli.handle())
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/cli.py", line 505, in handle
        self.dispatch_command(self.command, stage)
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/cli.py", line 539, in dispatch_command
        self.deploy(self.vargs['zip'])
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/cli.py", line 800, in deploy
        self.zappa.add_binary_support(api_id=api_id, cors=self.cors)
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/core.py", line 1490, in add_binary_support
        restApiId=api_id
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/botocore/client.py", line 314, in _api_call
        return self._make_api_call(operation_name, kwargs)
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/botocore/client.py", line 612, in _make_api_call
        raise error_class(parsed_response, operation_name)
    NotFoundException: An error occurred (NotFoundException) when calling the GetRestApi operation: Invalid REST API identifier specified 484375727565:akg59b222b
    
    ==============
    
    Need help? Found a bug? Let us know! :D
    File bug reports on GitHub here: https://github.com/Miserlou/Zappa
    And join our Slack channel here: https://slack.zappa.io
    Love!,
     ~ Team Zappa!

    TIP: To understand how your application works on serverless environment please visit this link.

    Post Deployment Setup

    Migrate database

    At this point, you should have an empty database for your Django application to fill up with a schema.

    $ zappa manage.py migrate dev

    Once you run above command the database migrations will be applied on the database as specified in your Django settings.

    Creating Superuser of Django Application

    You also might need to create a new superuser on the database. You could use the following command on your project directory.

    $ zappa invoke --raw dev "from django.contrib.auth.models import User; User.objects.create_superuser('username', 'username@yourdomain.com', 'password')"

    Alternatively,

    $ python manage createsuperuser

    Note that your application must be connected to the same database as this is run as standard Django administration command (not a Zappa command).

    Managing static files

    Your Django application will be having a dependency on static files, Django admin panel uses a combination of JS, CSS and image files.

    NOTE: Zappa is for running your application code, not for serving static web assets. If you plan on serving custom static assets in your web application (CSS/JavaScript/images/etc.), you’ll likely want to use a combination of AWS S3 and AWS CloudFront.

    You will need to add following packages to your virtual environment required for management of files to and from S3 django-storages and boto.

    $ pip install django-storages boto
    Add Django-Storage to your INSTALLED_APPS in settings.py
    INSTALLED_APPS = (
    ...,
    storages',
    )
    
    Configure Django-storage in settings.py as
    
    AWS_STORAGE_BUCKET_NAME = 'django-zappa-sample-bucket'
    AWS_S3_CUSTOM_DOMAIN = '%s.s3.amazonaws.com' % AWS_STORAGE_BUCKET_NAME
    STATIC_URL = "https://%s/" % AWS_S3_CUSTOM_DOMAIN
    STATICFILES_STORAGE = 'storages.backends.s3boto.S3BotoStorage'

    Once you have setup the Django application to serve your static files from AWS S3, run following command to upload the static file from your project to S3.

    $ python manage.py collectstatic --noinput

    or

    $ zappa update dev
    $ zappa manage dev "collectstatic --noinput"

    Check that at least 61 static files are moved to S3 bucket. Admin panel is built over  61 static files.

    NOTE: STATICFILES_DIR must be configured properly to collect your files from the appropriate location.

    Tip: You need to render static files in your templates by loading static path and using the same.  Example, {% static %}

    Setting Up API Gateway

    To connect to your Django application you also need to ensure you have API gateway setup for your AWS Lambda Function.  You need to have GET methods set up for all the URL resources used in your Django application. Alternatively, you can setup a proxy method to allow all subresources to be processed through one API method.

    Go to AWS Lambda function console and add API Gateway from ‘Add triggers’.

    1. Configure API, Deployment Stage, and Security for API Gateway. Click Save once it is done.

    2. Go to API Gateway console and,

    a. Recreate ANY method for / resource.

    i. Check `Use Lambda Proxy integration`

    ii. Set `Lambda Region` and `Lambda Function` and `Save` it.

    a. Recreate ANY method for /{proxy+} resource.

    i. Select `Lambda Function Proxy`

    ii. Set`Lambda Region` and `Lambda Function` and `Save` it.

    3. Click on Action and select Deploy API. Set Deployment Stage and click Deploy

    4. Ensure that GET and POST method for / and Proxy are set as Override for this method

    Setting Up Custom SSL Endpoint

    Optionally, you could also set up your own custom defined SSL endpoint with Zappa and install your certificate with your domain by running certify with Zappa. 

    $ zappa certify dev
    
    ...
    "certificate_arn": "arn:aws:acm:us-east-1:xxxxxxxxxxxx:certificate/xxxxxxxxxxxx-xxxxxx-xxxx-xxxx-xxxxxxxxxxxxxx",
    "domain": "django-zappa-sample.com"

    Now you are ready to launch your Django Application hosted on AWS Lambda.

    Additional Notes:

    •  Once deployed, you must run “zappa update <stage-name>” for updating your already hosted AWS Lambda function.</stage-name>
    • You can check server logs for investigation by running “zappa tail” command.
    • To un-deploy your application, simply run: `zappa undeploy <stage-name>`</stage-name>

    You’ve seen how to deploy Django application on AWS Lambda using Zappa. If you are creating your Django application for first time you might also want to read Edgar Roman’s Django Zappa Guide.

    Start building your Django application and let us know in the comments if you need any help during your application deployment over AWS Lambda.

  • Implementing Federated GraphQL Microservices using Apollo Federation

    Introduction

    GraphQL has revolutionized how a client queries a server. With the thin layer of GraphQL middleware, the client has the ability to query the data more comprehensively than what’s provided by the usual REST APIs.

    One of the key principles of GraphQL involves having a single data graph of the implementing services that will allow the client to have a unified interface to access more data and services through a single query. Having said that, it can be challenging to follow this principle for an enterprise-level application on a single, monolith GraphQL server.

    The Need for Federated Services

    James Baxley III, the Engineering Manager at Apollo, in his talk here, puts forward the rationale behind choosing an independently managed federated set of services very well.

    To summarize his point, let’s consider a very complex enterprise product. This product would essentially have multiple teams responsible for maintaining different modules of the product. Now, if we’re considering implementing a GraphQL layer at the backend, it would only make sense to follow the one graph principle of GraphQL: this says that to maximize the value of GraphQL, we should have a single unified data graph that’s operating at the data layer of this product. With that, it will be easier for a client to query a single graph and get all the data without having to query different graphs for different data portions.

    However, it would be challenging to have all of the huge enterprise data graphs’ layer logic residing on a single codebase. In addition, we want teams to be able to independently implement, maintain, and ship different schemas of the data graph on their own release cycles.

    Though there is only one graph, the implementation of that graph should be federated across multiple teams.

    Now, let’s consider a massive enterprise e-commerce platform as an example. The different schemas of the e-commerce platform look something like:

    Fig:- E-commerce platform set of schemas

    Considering the above example, it would be a chaotic task to maintain the graph implementation logic of all these schemas on a single code base. Another overhead that this would bring is having to scale a huge monolith that’s implementing all these services. 

    Thus, one solution is a federation of services for a single distributed data graph. Each service can be implemented independently by individual teams while maintaining their own release cycles and having their own iterations of their services. Also, a federated set of services would still follow the Onegraph principle of GraphQL, which will allow the client to query a single endpoint for fetching any part of the data graph.

    To further demonstrate the example above, let’s say the client asks for the top-five products, their reviews, and the vendor selling them. In a usual monolith GraphQL server, this query would involve writing a resolver that’s a mesh of the data sources of these individual schemas. It would be a task for teams to collaborate and come up with their individual implementations. Let’s consider a federated approach with separate services implementing products, reviews, and vendors. Each service is responsible for resolving only the part of the data graph that includes the schema and data source. This makes it extremely streamlined to allow different teams managing different schemas to collaborate easily.

    Another advantage would be handling the scaling of individual services rather than maintaining a compute-heavy monolith for a huge data graph. For example, the products service is used the most on the platform, and the vendors service is scarcely used. In case of a monolith approach, the scaling would’ve had to take place on the overall server. This is eliminated with federated services where we can independently maintain and scale individual services like the products service.

    Federated Implementation of GraphQL Services

    A monolith GraphQL server that implements a lot of services for different schemas can be challenging to scale. Instead of implementing the complete data graph on a single codebase, the responsibilities of different parts of the data graph can be split across multiple composable services. Each one will contain the implementation of only the part of the data graph it is responsible for. Apollo Federation allows this division of services and follows a declarative programming model to allow splitting of concerns.

    Architecture Overview

    This article will not cover the basics of GraphQL, such as writing resolvers and schemas. If you’re not acquainted with the basics of GraphQL and setting up a basic GraphQL server using Apollo, I would highly recommend reading about it here. Then, you can come back here to understand the implementation of federated services using Apollo Federation.

    Apollo Federation has two principal parts to it:

    • A collection of services that distinctly define separate GraphQL schemas
    • A gateway that builds the federated data graph and acts as a forefront to distinctly implement queries for different services
    Fig:- Apollo Federation Architecture

    Separation of Concerns

    The usual way of going about implementing federated services would be by splitting an existing monolith based on the existing schemas defined. Although this way seems like a clear approach, it will quickly cause problems when multiple Schemas are involved.

    To illustrate, this is a typical way to split services from a monolith based on the existing defined Schemas:

     

    In the example above, although the tweets field belongs to the User schema, it wouldn’t make sense to populate this field in the User service. The tweets field of a User should be declared and resolved in the Tweet service itself. Similarly, it wouldn’t be right to resolve the creator field inside the Tweet service.

    The reason behind this approach is the separation of concerns. The User service might not even have access to the Tweet datastore to be able to resolve the tweets field of a user. On the other hand, the Tweet service might not have access to the User datastore to resolve the creator field of the Tweet schema.

    Considering the above schemas, each service is responsible for resolving the respective field of each Schema it is responsible for.

    Implementation

    To illustrate an Apollo Federation, we’ll be considering a Nodejs server built with Typescript. The packages used are provided by the Apollo libraries.

    npm i --save apollo-server @apollo/federation @apollo/gateway

    Some additional libraries to help run the services in parallel:

    npm i --save nodemon ts-node concurrently

    Let’s go ahead and write the structure for the gateway service first. Let’s create a file gateway.ts:

    touch gateway.ts

    And add the following code snippet:

    import { ApolloServer } from 'apollo-server';
    import { ApolloGateway } from '@apollo/gateway';
    
    const gateway = new ApolloGateway({
      serviceList: [],
    });
    
    const server = new ApolloServer({ gateway, subscriptions: false });
    
    server.listen().then(({ url }) => {
      console.log(`Server ready at url: ${url}`);
    });

    Note the serviceList is an empty array for now since we’ve yet to implement the individual services. In addition, we pass the subscriptions: false option to the apollo server config because currently, Apollo Federation does not support subscriptions.

    Next, let’s add the User service in a separate file user.ts using:

    touch user.ts

    The code will go in the user service as follows:

    import { buildFederatedSchema } from '@apollo/federation';
    import { ApolloServer, gql } from 'apollo-server';
    import User from './datasources/models/User';
    import mongoStore from './mongoStore';
    
    const typeDefs = gql`
      type User @key(fields: "id") {
        id: ID!
        username: String!
      }
      extend type Query {
        users: [User]
        user(id: ID!): User
      }
      extend type Mutation {
        createUser(userPayload: UserPayload): User
      }
      input UserPayload {
        username: String!
      }
    `;
    
    const resolvers = {
      Query: {
        users: async () => {
          const allUsers = await User.find({});
          return allUsers;
        },
        user: async (_, { id }) => {
          const currentUser = await User.findOne({ _id: id });
          return currentUser;
        },
      },
      User: {
        __resolveReference: async (ref) => {
          const currentUser = await User.findOne({ _id: ref.id });
          return currentUser;
        },
      },
      Mutation: {
        createUser: async (_, { userPayload: { username } }) => {
          const user = new User({ username });
          const createdUser = await user.save();
          return createdUser;
        },
      },
    };
    
    mongoStore();
    
    const server = new ApolloServer({
      schema: buildFederatedSchema([{ typeDefs, resolvers }]),
    });
    
    server.listen({ port: 4001 }).then(({ url }) => {
      console.log(`User service ready at url: ${url}`);
    });

    Let’s break down the code that went into the User service.

    Consider the User schema definition:

    type User @key(fields: "id") {
       id: ID!
       username: String!
    }

    The @key directive helps other services understand the User schema is, in fact, an entity that can be extended within other individual services. The fields will help other services uniquely identify individual instances of the User schema based on the id.

    The Query and the Mutation types need to be extended by all implementing services according to the Apollo Federation documentation since they are always defined on a gateway level.

    As a side note, the User model imported from datasources/model/User

    import User from ‘./datasources/models/User’; is essentially a Mongoose ORM Model for MongoDB that will help in all the CRUD operations of a User entity in a MongoDB database. In addition, the mongoStore() function is responsible for establishing a connection to the MongoDB database server.

    The User model implementation internally in Mongoose ORM looks something like this:

    export const UserSchema = new Schema({
      username: {
        type: String,
      },
    });
    
    export default mongoose.model(
      'User',
      UserSchema
    );

    In the Query type, the users and the user(id: ID!) queries fetch a list or the details of individual users.

    In the resolvers, we define a __resolveReference function responsible for returning an instance of the User entity to all other implementing services, which just have a reference id of a User entity and need to return an instance of the User entity. The ref parameter is an object { id: ‘userEntityId’ } that contains the id of an instance of the User entity that may be passed down from other implementing services that need to resolve the reference of a User entity based on the reference id. Internally, we fire a mongoose .findOne query to return an instance of the User from the users database based on the reference id. To illustrate the resolver, 

    User: {
        __resolveReference: async (ref) => {
          const currentUser = User.findOne({ _id: ref.id });
          return currentUser;
        },
      },

    At the end of the file, we make sure the service is running on a unique port number 4001, which we pass as an option while running the apollo server. That concludes the User service.

    Next, let’s add the tweet service by creating a file tweet.ts using:

    touch tweet.ts

    The following code goes as a part of the tweet service:

    import { buildFederatedSchema } from '@apollo/federation';
    import { ApolloServer, gql } from 'apollo-server';
    import Tweet from './datasources/models/Tweet';
    import TweetAPI from './datasources/tweet';
    import mongoStore from './mongoStore';
    
    const typeDefs = gql`
      type Tweet {
        text: String
        id: ID!
        creator: User
      }
      extend type User @key(fields: "id") {
        id: ID! @external
        tweets: [Tweet]
      }
      extend type Query {
        tweet(id: ID!): Tweet
        tweets: [Tweet]
      }
      extend type Mutation {
        createTweet(tweetPayload: TweetPayload): Tweet
      }
      input TweetPayload {
        userId: String
        text: String
      }
    `;
    
    const resolvers = {
      Query: {
        tweet: async (_, { id }) => {
          const currentTweet = await Tweet.findOne({ _id: id });
          return currentTweet;
        },
        tweets: async () => {
          const tweetsList = await Tweet.find({});
          return tweetsList;
        },
      },
      Tweet: {
        creator: (tweet) => ({ __typename: 'User', id: tweet.userId }),
      },
      User: {
        tweets: async (user) => {
          const tweetsByUser = await Tweet.find({ userId: user.id });
          return tweetsByUser;
        },
      },
      Mutation: {
        createTweet: async (_, { tweetPayload: { text, userId } }) => {
          const newTweet = new Tweet({ text, userId });
          const createdTweet = await newTweet.save();
          return createdTweet;
        },
      },
    };
    
    mongoStore();
    
    const server = new ApolloServer({
      schema: buildFederatedSchema([{ typeDefs, resolvers }]),
    });
    
    server.listen({ port: 4002 }).then(({ url }) => {
      console.log(`Tweet service ready at url: ${url}`);
    });

    Let’s break down the Tweet service as well

    type Tweet {
       text: String
       id: ID!
       creator: User
    }

    The Tweet schema has the text field, which is the content of the tweet, a unique id of the tweet,  and a creator field, which is of the User entity type and resolves into the details of the user that created the tweet:

    extend type User @key(fields: "id") {
       id: ID! @external
       tweets: [Tweet]
    }

    We extend the User entity schema in this service, which has the id field with an @external directive. This helps the Tweet service understand that based on the given id field of the User entity schema, the instance of the User entity needs to be derived from another service (user service in this case).

    As we discussed previously, the tweets field of the extended User schema for the user entity should be resolved in the Tweet service since all the resolvers and access to the data sources with respect to the Tweets entity resides in this service.

    The Query and Mutation types of the Tweet service are pretty straightforward; we have a tweets and a tweet(id: ID!) queries to resolve a list or resolve an individual instance of the Tweet entity.

    Let’s further break down the resolvers:

    Tweet: {
       creator: (tweet) => ({ __typename: 'User', id: tweet.userId }),
    },

    To resolve the creator field of the Tweet entity, the Tweet service needs to tell the gateway that this field will be resolved by the User service. Hence, we pass the id of the User and a __typename for the gateway to be able to call the right service to resolve the User entity instance. In the User service earlier, we wrote a  __resolveReference resolver, which will resolve the reference of a User based on an id.

    User: {
       tweets: async (user) => {
           const tweetsByUser = await Tweet.find({ userId: user.id });
           return tweetsByUser;
       },
    },

    Now, we need to resolve the tweets field of the User entity extended in the Tweet service. We need to write a resolver where we get the parent user entity reference in the first argument of the resolver using which we can fire a Mongoose ORM query to return all the tweets created by the user given its id.

    At the end of the file, similar to the User service, we make sure the Tweet service runs on a different port by adding the port: 4002 option to the Apollo server config. That concludes both our implementing services.

    Now that we have our services ready, let’s update our gateway.ts file to reflect the added services:

    import { ApolloServer } from 'apollo-server';
    import { ApolloGateway } from '@apollo/gateway';
    
    const gateway = new ApolloGateway({
      serviceList: [
          { name: 'users', url: 'http://localhost:4001' },
          { name: 'tweets', url: 'http://localhost:4002' },
        ],
    });
    
    const server = new ApolloServer({ gateway, subscriptions: false });
    
    server.listen().then(({ url }) => {
      console.log(`Server ready at url: ${url}`);
    });

    We’ve added two services to the serviceList with a unique name to identify each service followed by the URL they are running on.

    Next, let’s make some small changes to the package.json file to make sure the services and the gateway run in parallel:

    "scripts": {
        "start": "concurrently -k npm:server:*",
        "server:gateway": "nodemon --watch 'src/**/*.ts' --exec 'ts-node' src/gateway.ts",
        "server:user": "nodemon --watch 'src/**/*.ts' --exec 'ts-node' src/user.ts",
        "server:tweet": "nodemon --watch 'src/**/*.ts' --exec 'ts-node' src/tweet.ts"
      },

    The concurrently library helps run 3 separate scripts in parallel. The server:* scripts spin up a dev server using nodemon to watch and reload the server for changes and ts-node to execute Typescript node.

    Let’s spin up our server:

    npm start

    On visiting the http://localhost:4000, you should see the GraphQL query playground running an Apollo server:

    Querying and Mutation from the Client

    Initially, let’s fire some mutations to create two users and some tweets by those users.

    Mutations

    Here we have created a user with the username “@elonmusk” that returns the id of the user. Fire the following mutations in the GraphQL playground:

     

    We will create another user named “@billgates” and take a note of the ID.

    Here is a simple mutation to create a tweet by the user “@elonmusk”. Now that we have two created users, let’s fire some mutations to create tweets by those users:

    Here is another mutation that creates a tweet by the user“@billgates”.

    After adding a couple of those, we are good to fire our queries, which will allow the gateway to compose the data by resolving fields through different services.

    Queries

    Initially, let’s list all the tweets along with their creator, which is of type User. The query will look something like:

    {
     tweets {
       text
       creator {
         username
       }
     }
    }

    When the gateway encounters a query asking for tweet data, it forwards that query to the Tweet service since the Tweet service that extends the Query type has a tweet query defined in it. 

    On encountering the creator field of the tweet schema, which is of the type User, the creator resolver within the Tweet service is invoked. This is essentially just passing a __typename and an id, which tells the gateway to resolve this reference from another service.

    In the User service, we have a __resolveReference function, which returns the complete instance of a user given it’s id passed from the Tweet service. It also helps all other implementing services that need the reference of a User entity resolved.

    On firing the query, the response should look something like:

    {
      "data": {
        "tweets": [
          {
            "text": "I own Tesla",
            "creator": {
              "username": "@elonmusk"
            }
          },
          {
            "text": "I own SpaceX",
            "creator": {
              "username": "@elonmusk"
            }
          },
          {
            "text": "I own PayPal",
            "creator": {
              "username": "@elonmusk"
            }
          },
          {
            "text": "I own Microsoft",
            "creator": {
              "username": "@billgates"
            }
          },
          {
            "text": "I own XBOX",
            "creator": {
              "username": "@billgates"
            }
          }
        ]
      }
    }

    Now, let’s try it the other way round. Let’s list all users and add the field tweets that will be an array of all the tweets created by that user. The query should look something like:

    {
     users {
       username
       tweets {
         text
       }
     }
    }

    When the gateway encounters the query of type users, it passes down that query to the user service. The User service is responsible for resolving the username field of the query.

    On encountering the tweets field of the users query, the gateway checks if any other implementing service has extended the User entity and has a resolver written within the service to resolve any additional fields of the type User.

    The Tweet service has extended the type User and has a resolver for the User type to resolve the tweets field, which will fetch all the tweets created by the user given the id of the user.

    On firing the query, the response should be something like:

    {
      "data": {
        "users": [
          {
            "username": "@elonmusk",
            "tweets": [
              {
                "text": "I own Tesla"
              },
              {
                "text": "I own SpaceX"
              },
              {
                "text": "I own PayPal"
              }
            ]
          },
          {
            "username": "@billgates",
            "tweets": [
              {
                "text": "I own Microsoft"
              },
              {
                "text": "I own XBOX"
              }
            ]
          }
        ]
      }
    }

    Conclusion

    To scale an enterprise data graph on a monolith GraphQL service brings along a lot of challenges. Having the ability to distribute our data graph into implementing services that can be individually maintained or scaled using Apollo Federation helps to quell any concerns.

    There are further advantages of federated services. Considering our example above, we could have two different kinds of datastores for the User and the Tweet service. While the User data could reside on a NoSQL database like MongoDB, the Tweet data could be on a SQL database like Postgres or SQL. This would be very easy to implement since each service is only responsible for resolving references only for the type they own.

    Final Thoughts

    One of the key advantages of having different services that can be maintained individually is the ability to deploy each service separately. In addition, this also enables deployment of different services independently to different platforms such as Firebase, Lambdas, etc.

    A single monolith GraphQL server deployed on an instance or a single serverless platform can have some challenges with respect to scaling an instance or handling high concurrency as mentioned above.

    By splitting out the services, we could have a separate serverless function for each implementing service that can be maintained or scaled individually and also a separate function on which the gateway can be deployed.

    One popular usage of GraphQL Federation can be seen in this Netflix Technology blog, where they’ve explained how they solved a bottleneck with the GraphQL APIs in Netflix Studio . What they did was create a federated GraphQL microservices architecture, along with a Schema store using Apollo Federation. This solution helped them create a unified schema but with distributed ownership and implementation.

  • Lessons Learnt While Building an ETL Pipeline for MongoDB & Amazon Redshift Using Apache Airflow

    Recently, I was involved in building an ETL (Extract-Transform-Load) pipeline. It included extracting data from MongoDB collections, perform transformations and then loading it into Redshift tables. Many ETL solutions are available in the market which kind-of solves the issue, but the key part of an ETL process lies in its ability to transform or process raw data before it is pushed to its destination.

    Each ETL pipeline comes with a specific business requirement around processing data which is hard to be achieved using off-the-shelf ETL solutions. This is why a majority of ETL solutions are custom built manually, from scratch. In this blog, I am going to talk about my learning around building a custom ETL solution which involved moving data from MongoDB to Redshift using Apache Airflow.

    Background:

    I began by writing a Python-based command line tool which supported different phases of ETL, like extracting data from MongoDB, processing extracted data locally, uploading the processed data to S3, loading data from S3 to Redshift, post-processing and cleanup. I used the PyMongo library to interact with MongoDB and the Boto library for interacting with Redshift and S3.

    I kept each operation atomic so that multiple instances of each operation can run independently of each other, which will help to achieve parallelism. One of the major challenges was to achieve parallelism while running the ETL tasks. One option was to develop our own framework based on threads or developing a distributed task scheduler tool using a message broker tool like Celery combined with RabbitMQ. After doing some research I settled for Apache Airflow. Airflow is a Python-based scheduler where you can define DAGs (Directed Acyclic Graphs), which would run as per the given schedule and run tasks in parallel in each phase of your ETL. You can define DAG as Python code and it also enables you to handle the state of your DAG run using environment variables. Features like task retries on failure handling are a plus.

    We faced several challenges while getting the above ETL workflow to be near real-time and fault tolerant. We discuss the challenges faced and the solutions below:

    Keeping your ETL code changes in sync with Redshift schema

    While you are building the ETL tool, you may end up fetching a new field from MongoDB, but at the same time, you have to add that column to the corresponding Redshift table. If you fail to do so the ETL pipeline will start failing. In order to tackle this, I created a database migration tool which would become the first step in my ETL workflow.

    The migration tool would:

    • keep the migration status in a Redshift table and
    • would track all migration scripts in a code directory.

    In each ETL run, it would get the most recently ran migrations from Redshift and would search for any new migration script available in the code directory. If found it would run the newly found migration script after which the regular ETL tasks would run. This adds the onus on the developer to add a migration script if he is making any changes like addition or removal of a field that he is fetching from MongoDB.

    Maintaining data consistency

    While extracting data from MongoDB, one needs to ensure all the collections are extracted at a specific point in time else there can be data inconsistency issues. We need to solve this problem at multiple levels:

    • While extracting data from MongoDB define parameters like modified date and extract data from different collections with a filter as records less than or equal to that date. This will ensure you fetch point in time data from MongoDB.
    • While loading data into Redshift tables, don’t load directly to master table, instead load it to some staging table. Once you are done loading data in staging for all related collections, load it to master from staging within a single transaction. This way data is either updated in all related tables or in none of the tables.

    A single bad record can break your ETL

    While moving data across the ETL pipeline into Redshift, one needs to take care of field formats. For example, the Date field in the incoming data can be different than that in the Redshift schema design. Another example can be that the incoming data can exceed the length of the field in the schema. Redshift’s COPY command which is used to load data from files to redshift tables is very vulnerable to such changes in data types. Even a single incorrectly formatted record will lead to all your data getting rejected and effectively breaking the ETL pipeline.

    There are multiple ways in which we can solve this problem. Either handle it in one of the transform jobs in the pipeline. Alternately we put the onus on Redshift to handle these variances. Redshift’s COPY command has many options which can help you solve these problems. Some of the very useful options are

    • ACCEPTANYDATE: Allows any date format, including invalid formats such as 00/00/00 00:00:00, to be loaded without generating an error.
    • ACCEPTINVCHARS: Enables loading of data into VARCHAR columns even if the data contains invalid UTF-8 characters.
    • TRUNCATECOLUMNS: Truncates data in columns to the appropriate number of characters so that it fits the column specification.

    Redshift going out of storage

    Redshift is based on PostgreSQL and one of the common problems is when you delete records from Redshift tables it does not actually free up space. So if your ETL process is deleting and creating new records frequently, then you may run out of Redshift storage space. VACUUM operation for Redshift is the solution to this problem. Instead of making VACUUM operation a part of your main ETL flow, define a different workflow which runs on a different schedule to run VACUUM operation. VACUUM operation reclaims space and resorts rows in either a specified table or all tables in the current database. VACUUM operation can be FULL, SORT ONLY, DELETE ONLY & REINDEX. More information on VACUUM can be found here.

    ETL instance going out of storage

    Your ETL will be generating a lot of files by extracting data from MongoDB onto your ETL instance. It is very important to periodically delete those files otherwise you are very likely to go out of storage on your ETL server. If your data from MongoDB is huge, you might end up creating large files on your ETL server. Again, I would recommend defining a different workflow which runs on a different schedule to run a cleanup operation.

    Making ETL Near Real Time

    Processing only the delta rather than doing a full load in each ETL run

    ETL would be faster if you keep track of the already processed data and process only the new data. If you are doing a full load of data in each ETL run, then the solution would not scale as your data scales. As a solution to this, we made it mandatory for the collection in our MongoDB to have a created and a modified date. Our ETL would check the maximum value of the modified date for the given collection from the Redshift table. It will then generate the filter query to fetch only those records from MongoDB which have modified date greater than that of the maximum value. It may be difficult for you to make changes in your product, but it’s worth the effort!

    Compressing and splitting files while loading

    A good approach is to write files in some compressed format. It saves your storage space on ETL server and also helps when you load data to Redshift. Redshift COPY command suggests that you provide compressed files as input. Also instead of a single huge file, you should split your files into parts and give all files to a single COPY command. This will enable Redshift to use it’s computing resources across the cluster to do the copy in parallel, leading to faster loads.

    Streaming mongo data directly to S3 instead of writing it to ETL server

    One of the major overhead in the ETL process is to write data first to ETL server and then uploading it to S3. In order to reduce disk IO, you should not store data to ETL server. Instead, use MongoDB’s handy stream API. For MongoDB Node driver, both the collection.find() and the collection.aggregate() function return cursors. The stream method also accepts a transform function as a parameter. All your custom transform logic could go into the transform function. AWS S3’s node library’s upload() function, also accepts readable streams. Use the stream from the MongoDB Node stream method, pipe it into zlib to gzip it, then feed the readable stream into AWS S3’s Node library. Simple! You will see a large improvement in your ETL process by this simple but important change.

    Optimizing Redshift Queries

    Optimizing Redshift Queries helps in making the ETL system highly scalable, efficient and also reduce the cost. Lets look at some of the approaches:

    Add a distribution key

    Redshift database is clustered, meaning your data is stored across cluster nodes. When you query for certain set of records, Redshift has to search for those records in each node, leading to slow queries. A distribution key is a single metric, which will decide the data distribution of all data records across your tables. If you have a single metric which is available for all your data, you can specify it as distribution key. When loading data into Redshift, all data for a certain value of distribution key will be placed on a single node of Redshift cluster. So when you query for certain records Redshift knows exactly where to search for your data. This is only useful when you are also using the distribution key to query the data.

    Source: Slideshare

     

    Generating a numeric primary key for string primary key

    In MongoDB, you can have any type of field as your primary key. If your Mongo collections are having a non-numeric primary key and you are using those same keys in Redshift, your joins will end up being on string keys which are slower. Instead, generate numeric keys for your string keys and joining on it which will make queries run much faster. Redshift supports specifying a column with an attribute as IDENTITY which will auto-generate numeric unique value for the column which you can use as your primary key.

    Conclusion:

    In this blog, I have covered the best practices around building ETL pipelines for Redshift  based on my learning. There are many more recommended practices which can be easily found in Redshift and MongoDB documentation. 

  • Prow + Kubernetes – A Perfect Combination To Execute CI/CD At Scale

    Intro

    Kubernetes is currently the hottest and standard way of deploying workloads in the cloud. It’s well-suited for companies and vendors that need self-healing, high availability, cloud-agnostic characteristics, and easy extensibility.

    Now, on another front, a problem has arisen within the CI/CD domain. Since people are using Kubernetes as the underlying orchestrator, they need a robust CI/CD tool that is entirely Kubernetes-native.

    Enter Prow

    Prow compliments the Kubernetes family in the realm of automation and CI/CD.

    In fact, it is the only project that best exemplifies why and how Kubernetes is such a superb platform to execute CI/CD at scale.

    Prow (meaning: portion of a ship’s bow—ship’s front end–that’s above water) is a Kubernetes-native CI/CD system, and it has been used by many companies over the past few years like Kyma, Istio, Kubeflow, Openshift, etc.

    Where did it come from?

    Kubernetes is one of the largest and most successful open-source projects on GitHub. When it comes to Prow’s conception , the Kubernetes community was trying hard to keep its head above water in matters of CI/CD. Their needs included the execution of more than 10k CI/CD jobs/day, spanning over 100+ different repositories in various GitHub organizations—and other automation technology stacks were just not capable of handling everything at this scale.

    So, the Kubernetes Testing SIG created their own tools to compliment Prow. Because Prow is currently residing under Kubernetes test-infra project, one might underestimate its true prowess/capabilities. I personally would like to see Prow receive a dedicated repo coming out from under the umbrella of test-infra.

    What is Prow?

    Prow is not too complex to understand but still vast in a subtle way. It is designed and built on a distributed microservice architecture native to Kubernetes.

    It has many components that integrate with one another (plank, hook, etc.) and a bunch of standalone ones that are more of a plug-n-play nature (trigger, config-updater, etc.).

    For the context of this blog, I will not be covering Prow’s entire architecture, but feel free to dive into it on your own later. 

    Just to name the main building blocks for Prow:

    • Hook – acts as an API gateway to intercept all requests from Github, which then creates a Prow job custom resource that reads the job configuration as well as calls any specific plugin if needed.
    • Plank – is the Prow job controller; after Hook creates a Prow job, Plank processes it and creates a Kubernetes pod for it to run the tests.
    • Deck – serves as the UI for the history of jobs that ran in the past or are currently running.
    • Horologium – is the component that processes periodic jobs only.
    • Sinker responsible for cleaning up old jobs and pods from the cluster.

    More can be found here: Prow Architecture. Note that this link is not the official doc from Kubernetes but from another great open source project that uses Prow extensively day-in-day-out – Kyma.

    This is how Prow can be picturized:


     

     

    Here is a list of things Prow can do and why it was conceived in the first place.

    • GitHub Automation on a wide range

      – ChatOps via slash command like “/foo
      – Fine-tuned policies and permission management in GitHub via OWNERS files
      – tide – PR/merge automation
      ghProxy – to avoid hitting API limits and to use GitHub API request cache
      – label plugin – labels management 
      – branchprotector – branch protection configuration 
      – releasenote – release notes management
    • Job Execution engine – Plank‍
    • Job status Reporting to CI/CD dashboard – crier‍
    • Dashboards for comprehensive job/PR history, merge status, real-time logs, and other statuses – Deck‍
    • Plug-n-play service to interact with GCS and show job artifacts on dashboard – Spyglass‍
    • Super easy pluggable Prometheus stack for observability – metrics‍
    • Config-as-Code for Prow itself – updateconfig‍
    • And many more, like sinker, branch protector, etc.

    Possible Jobs in Prow

    Here, a job means any “task that is executed over a trigger.” This trigger can be anything from a github commit to a new PR or a periodic cron trigger. Possible jobs in Prow include:  

    • Presubmit – these jobs are triggered when a new github PR is created.
    • Postsubmit – triggered when there is a new commit.
    • Periodic – triggered on a specific cron time trigger.

    Possible states for a job

    • triggered – a new Prow-job custom resource is created reading the job configs
    • pending – a pod is created in response to the Prow-job to run the scripts/tests; Prow-job will be marked pending while the pod is getting created and running 
    • success – if a pod succeeds, the Prow-job status will change to success 
    • failure – if a pod fails, the Prow-job status will be marked failure
    • aborted – when a job is running and the same one is retriggered, then the first pro-job execution will be aborted and its status will change to aborted and the new one is marked pending

    What a job config looks like:

    presubmits:
      kubernetes/community:
      - name: pull-community-verify  # convention: (job type)-(repo name)-(suite name)
        branches:
        - master
        decorate: true
        always_run: true
        spec:
          containers:
          - image: golang:1.12.5
            command:
            - /bin/bash
            args:
            - -c
            - "export PATH=$GOPATH/bin:$PATH && make verify"

    • Here, this job is a “presubmit” type, meaning it will be executed when a PR is created against the “master” branch in repo “kubernetes/community”.
    • As shown in spec, a pod will be created from image “Golang” where this repo will be cloned, and the mentioned command will be executed at the start of the container.
    • The output of that command will decide if the pod has succeeded or failed, which will, in turn, decide if the Prow job has successfully completed.

    More jobs configs used by Kubernetes itself can be found here – Jobs

    Getting a minimalistic Prow cluster up and running on the local system in minutes.

    Pre-reqs:

    • Knowledge of Kubernetes 
    • Knowledge of Google Cloud and IAM

    For the context of this blog, I have created a sample github repo containing all the basic manifest files and config files. For this repo, the basic CI has also been configured. Feel free to clone/fork this and use it as a getting started guide.

    Let’s look at the directory structure for the repo:

    .
    ├── docker/     # Contains docker image in which all the CI jobs will run
    ├── hack/       # Contains small hack scripts used in a wide range of jobs 
    ├── hello.go
    ├── hello_test.go
    ├── Dockerfile
    ├── Makefile
    ├── prow
    │   ├── cluster/       # Install prow on k8s cluster
    │   ├── jobs/          # CI jobs config
    │   ├── labels.yaml    # Prow label config for managing github labels
    │   ├── config.yaml    # Prow config
    │   └── plugins.yaml   # Prow plugins config
    └── README.md

    1. Create a bot account. For info, look here. Add this bot as a collaborator in your repo. 

    2. Create an OAuth2 token from the GitHub GUI for the bot account.

    $ echo "PUT_TOKEN_HERE" > oauth
    $ kubectl create secret generic oauth --from-file=oauth=oauth

    3. Create an OpenSSL token to be used with the Hook.

    $ openssl rand -hex 20 > hmac
    $ kubectl create secret generic hmac --from-file=hmac=hmac

    4. Install all the Prow components mentioned in prow-starter.yaml.

    $ make deploy-prow

    5. Update all the jobs and plugins needed for the CI (rules mentioned in the Makefile). Use commands:

    • Updates in plugins.yaml and presubmits.yaml:
    • Change the repo name (velotio-tech/k8s-prow-guide) for the jobs to be configured 
    • Updates in config.yaml:
    • Create a GCS bucket 
    • Update the name of GCS bucket (GCS_BUCKET_NAME) in the config.yaml
    • Create a service_account.json with GCS storage permission and download the JSON file 
    • Create a secret from above service_account.json
    $ kubectl create secret generic gcs-sa --from-file=service-account.json=service-account.json

    • Update the secret name (GCS_SERVICE_ACC) in config.yaml
    $ make update-config
    $ make update-plugins
    $ make update-jobs

    6. For exposing a webhook from GitHub repo and pointing it to the local machine, use Ultrahook. Install Ultrahook. This will give you a publicly accessible endpoint. In my case, the result looked like this: http://github.sanster23.ultrahook.com. 

    $ echo "api_key: <API_KEY_ULTRAHOOK>" > ~/.ultrahook
    $ ultrahook github http://<MINIKUBE_IP>:<HOOK_NODE_PORT>/hook

    7. Create a webhook in your repo so that all events can be published to Hook via the public URL above:

    • Set the webhook URL from Step 6
    • Set Content Type as application/json
    • Set the value of token the same as hmac token secret, created in Step 2 
    • Check the “Send me everything” box

    8. Create a new PR and see the magic.

    9. Dashboard for Prow will be accessible at http://<minikube_ip>:<deck_node_port></deck_node_port></minikube_ip>

    • MINIKUBE_IP : 192.168.99.100  ( Run “minikube ip”)
    • DECK_NODE_PORT :  32710 ( Run “kubectl get svc deck” )

    I will leave you guys with an official reference of Prow Dashboard:

    What’s Next

    Above is an effort just to give you a taste of what Prow can do with and how easy it is to set up at any scale of infra and for a project of any complexity.

    P.S. – The content surrounding Prow is scarce, making it a bit unexplored in certain ways, but I found this helpful channel on the Kubernetes Slack #prow. Hopefully, this helps you explore the uncharted waters of Kubernetes Native CI/CD.