Tag: python

  • Policy Insights: Chatbots and RAG in Health Insurance Navigation

    Introduction

    Understanding health insurance policies can often be complicated, leaving individuals to tackle lengthy and difficult documents. The complexity introduced by these policies’ language not only adds to the confusion but also leaves policyholders uncertain about the actual extent of their coverage, the best plan for their needs, and how to seek answers to their specific policy-related questions. In response to these ongoing challenges and to facilitate better access to information, a fresh perspective is being explored—an innovative approach to revolutionize how individuals engage with their health insurance policies.

    Challenges in Health Insurance Communication

    Health insurance queries are inherently complex, often involving nuanced details that require precision. Traditional chatbots, lacking the finesse of generative AI (GenAI), struggle to handle the intricacies of healthcare-related questions. The envisioned health insurance chatbot powered by GenAI overcomes these limitations, offering a sophisticated understanding of queries and delivering responses that align with the complexities of the healthcare sphere.

    Retrieval-Augmented Generation

    Retrieval-augmented generation, or RAG, is an architectural approach that can improve the efficacy of large language model (LLM) applications by leveraging custom data. This is done by retrieving relevant data/documents relevant to a question or task and providing them as context for the LLM. RAG has shown success in supporting chatbots and Q&A systems that need to maintain up-to-date information or access domain-specific knowledge.

    To know more about this topic, check here for technical insights and additional information.

    1. https://www.oracle.com/in/artificial-intelligence/generative-ai/retrieval-augmented-generation-rag/

    2. https://research.ibm.com/blog/retrieval-augmented-generation-RAG

    The Dual Phases of RAG: Retrieval and Content Generation

    Retrieval-augmented generation (RAG) smoothly combines two essential steps, carefully blending retrieval and content generation. Initially, algorithms diligently explore external knowledge bases to find relevant data that matches user queries. This gathered information then becomes the foundation for the next phase—content generation. In this step, the large language model uses both the enhanced prompt and its internal training data to create responses that are not only accurate but also contextually appropriate.

    Advantages of Deploying RAG in AI Chatbots

    Scalability is a key advantage of RAG over traditional models. Instead of relying on a monolithic model attempting to memorize vast amounts of information, RAG models can easily scale by updating or expanding the external database. This flexibility enables them to manage and incorporate a broader range of data efficiently.

    Memory efficiency is another strength of RAG in comparison to models like GPT. While traditional models have limitations on the volume of data they can store and recall, RAG efficiently utilizes external databases. This approach allows RAG to fetch fresh, updated, or detailed information as needed, surpassing the memory constraints of conventional models.

    Moreover, RAG offers flexibility in its knowledge sources. By modifying or enlarging the external knowledge base, a RAG model can be adapted to specific domains without the need for retraining the underlying generative model. This adaptability ensures that RAG remains a versatile and efficient solution for various applications.

    The displayed image outlines the application flow. In the development of our health insurance chatbot, we follow a comprehensive training process. Initially, essential PDF documents are loaded to familiarize our model with the intricacies of health insurance. These documents undergo tokenization, breaking them into smaller units for in-depth analysis. Each of these units, referred to as tokens, is then transformed into numerical vectors through a process known as vectorization. These numerical representations are efficiently stored in ChromaDB for quick retrieval.

    When a question is posed by a user, the numerical version of the query is retrieved from ChromaDB by the chatbot. Employing a language model (LLM), the chatbot crafts a nuanced response based on this numerical representation. This method ensures a smooth and efficient conversational experience. Armed with a wealth of health insurance information, the chatbot delivers precise and contextually relevant responses to user inquiries, establishing itself as a valuable resource for navigating the complexities of health insurance queries.

    Role of Vector Embedding

    Traditional search engines mainly focus on finding specific words in your search. For example, if you search “best smartphone,” it looks for pages with exactly those words. On the other hand, semantic search is like a more understanding search engine. It tries to figure out what you really mean by considering the context of your words.

    Imagine you are planning a vacation and want to find a suitable destination, and you input the query “warm places to visit in winter.” In a traditional search, the engine would look for exact matches of these words on web pages. Results might include pages with those specific terms, but the relevance might vary.

    Text, audio, and video can be embedded:

    An embedding is a vector (list) of floating point numbers. The distance between two vectors measures their relatedness. Small distances suggest high relatedness, and large distances suggest low relatedness.

    For example:

    bat: [0.6, -0.3, 0.8, …]

    ball: [0.4, -0.2, 0.7, …]

    wicket: [-0.5, 0.6, -0.2, …]

    In this cricket-themed example, each word (bat, ball, wicket) is represented as a vector in a multi-dimensional space, capturing the semantic relationships between cricket-related terms.

    For a deeper understanding, you may explore additional insights in the following articles:

    1. https://www.datastax.com/guides/what-is-a-vector-embedding

    2. https://www.pinecone.io/learn/vector-embeddings/

    3. https://weaviate.io/blog/vector-embeddings-explained/

    A specialized type of database known as a vector database is essential for storing these numerical representations. In a vector database, data is stored as mathematical vectors, providing a unique way to store and retrieve information. This specialized database greatly facilitates machine learning models in retaining and recalling previous inputs, enabling powerful applications in search, recommendations, and text generation.

    Vector retrieval in a database involves finding the nearest neighbors or most similar vectors to a given query vector. These are metrics for finding similar vectors:

    1. The Euclidean distance metric considers both magnitudes and direction, providing a comprehensive measure for assessing the spatial separation between vectors.

    2. Cosine similarity focuses solely on the direction of vectors, offering insights into their alignment within the vector space.

    3. Dot product similarity metric takes into account both magnitudes and direction, offering a versatile approach for evaluating the relationships between vectors.

    ChromaDB, PineCone, and Milvus are a few examples of vector databases.

    For our application, we will be using LangChain, OpenAI embedding and LLM, and ChromaDB.

    1. We need to install Python packages required for this application.
    !pip install -U langchain openai chromadb langchainhub pypdf tiktoken

    A. LangChain is a tool that helps you build intelligent applications using language models. It allows you to develop chatbots, personal assistants, and applications that can summarize, analyze, or respond to questions about documents or data. It’s useful for tasks like coding assistance, working with APIs, and other activities that gain an advantage from AI technology.

    B. OpenAI is a renowned artificial intelligence research lab. Installing the OpenAI package provides access to OpenAI’s language models, including powerful ones like GPT-3. This library is crucial if you plan to integrate OpenAI’s language models into your applications.

    C. As mentioned earlier, ChromaDB is a vector database package designed to handle vector data efficiently, making it suitable for applications that involve similarity searches, clustering, or other operations on vectors.

    D. LangChainHub is a handy tool to make your language tasks easier. It begins with helpful prompts and will soon include even more features like chains and agents.

    E. PyPDF2 is a library for working with PDF files in Python. It allows reading and manipulating PDF documents, making it useful for tasks such as extracting text or merging PDF files.

    F. Tiktoken is a Python library designed for counting the number of tokens in a text string without making an API call. This can be particularly useful for managing token limits when working with language models or APIs that have usage constraints.

    1. Importing Libraries
    from langchain.chat_models import ChatOpenAI 
    from langchain.embeddings import OpenAIEmbeddings 
    from langchain.vectorstores import Chroma 
    from langchain.prompts import ChatPromptTemplate 
    from langchain.prompts import PromptTemplate
    from langchain.schema import StrOutputParser 
    from langchain.schema.runnable import RunnablePassthrough

    1. Initializing OpenAI LLM
    llm = ChatOpenAI(
    api_key=OPENAI_API_KEY”,
    model_name="gpt-4", 
    temperature=0.1
    )

    This line of code initializes a language model (LLM) using OpenAI’s GPT-4 model with 8192 tokens. Temperature parameter influences the randomness of text generated, and increased temperature results in more creative responses, while decreased temperature leads to more focused and deterministic answers.

    1. We will be loading a PDF consisting of material for training the model and also need to divide it into chunks of texts that can be fed to the model.
    from langchain.document_loaders import PyPDFLoader
    loader = PyPDFLoader(
    "HealthInsureBot_GenerativeAI_TrainingGuide.pdf") 
    docs = loader.load_and_split()

    1. We will be loading this chunk of text into the vector Database Chromadb, later used for retrieval and using OpenAI embeddings.
    vectorstore = Chroma.from_documents
    (
    documents=docs,
    embedding=OpenAIEmbeddings(api_key=OPENAI_API_KEY”)
    )

    1. Creating a retrieval object will return the top 3 similar vector matches for the query.
    retriever = vectorstore.as_retriever
    (
    search_type="similarity",
    search_kwargs={"k": 3}
    )

    7. Creating a prompt to pass to the LLM for obtaining specific information involves crafting a well-structured question or instruction that clearly outlines the desired details. RAG chain initiates with the retriever and formatted documents, progresses through the custom prompt template, involves the LLM , and concludes by utilizing a string output parser (StrOutputParser()) to handle the resulting response.

    def format_docs(docs): 
       return "nn".join(doc.page_content for doc in docs)
    
    template = """Use the following pieces of context as a virtual health insurance agent to answer the question and provide relevance score out of 10 for each response. If you don't know the answer, just say that you don't know, don't try to make up an answer. {context} Question: {question} Helpful Answer:""" 
    rag_prompt_custom = PromptTemplate.from_template(template) 
    rag_chain = ( 
    {"context": retriever | format_docs, "question": RunnablePassthrough()} | rag_prompt_custom 
    | llm 
    | StrOutputParser()
    )

    1. Create a function to get a response from the chatbot.
    def chatbot_response(user_query):
        return rag_chain.invoke(user_query)

    We can integrate the Streamlit tool for building a powerful generative app, using this function in Streamlit application to get the AI response.

    import openai 
    import streamlit as st
    from health_insurance_bot import chatbot_response 
    st.title("Health Insurance Chatbot")
    if "messages" not in st.session_state: 
       st.session_state["messages"] = 
       [{"role": "assistant", "content": "How can I help you?"}] 
    
    for msg in st.session_state.messages: 
       st.chat_message(msg["role"]).write(msg["content"]) 
    
    if prompt := st.chat_input(): 
      openai.api_key = st.secrets['openai_api_key']     
      st.session_state.messages.append({"role": "user", "content": prompt}) 
      st.chat_message(name="user").write(prompt) 
      response = chatbot_response(prompt)   
      st.session_state.messages.append({"role": "assistant", "content": response})  
      st.chat_message(name="assistant").write(response)

    Performance Insights

    Conclusion

    In our exploration of developing health insurance chatbots, we’ve dived into the innovative world of retrieval-augmented generation (RAG), where advanced technologies are seamlessly combined to reshape user interactions. The adoption of RAG has proven to be a game-changer, significantly enhancing the chatbot’s abilities to understand, retrieve, and generate contextually relevant responses. However, it’s worth mentioning a couple of limitations, including challenges in accurately calculating premium quotes and occasional inaccuracies in semantic searches.

  • Vector Search: The New Frontier in Personalized Recommendations

    Introduction

    Imagine you are a modern-day treasure hunter, not in search of hidden gold, but rather the wealth of knowledge and entertainment hidden within the vast digital ocean of content. In this realm, where every conceivable topic has its own sea of content, discovering what will truly captivate you is like finding a needle in an expansive haystack.

    This challenge leads us to the marvels of recommendation services, acting as your compass in this digital expanse. These services are the unsung heroes behind the scenes of your favorite platforms, from e-commerce sites that suggest enticing products to streaming services that understand your movie preferences better than you might yourself. They sift through immense datasets of user interactions and content features, striving to tailor your online experience to be more personalized, engaging, and enriching.

    But what if I told you that there is a cutting-edge technology that can take personalized recommendations to the next level? Today, I will take you through a journey to build a blog recommendation service that understands the contextual similarities between different pieces of content, transcending beyond basic keyword matching. We’ll harness the power of vector search, a technology that’s revolutionizing personalized recommendations. We’ll explore how recommendation services are traditionally implemented, and then briefly discuss how vector search enhances them.

    Finally, we’ll put this knowledge to work, using OpenAI’s embedding API and Elasticsearch to create a recommendation service that not only finds content but also understands and aligns it with your unique interests.

    Exploring the Landscape: Traditional Recommendation Systems and Their Limits

    Traditionally, these digital compasses, or recommendation systems, employ methods like collaborative and content-based filtering. Imagine sitting in a café where the barista suggests a coffee based on what others with similar tastes enjoyed (collaborative filtering) or based on your past coffee choices (content-based filtering). While these methods have been effective in many scenarios, they come with some limitations. They often stumble when faced with the vast and unstructured wilderness of web data, struggling to make sense of the diverse and ever-expanding content landscape. Additionally, when user preferences are ambiguous or when you want to recommend content by truly understanding it on a semantic level, traditional methods may fall short.

    Enhancing Recommendation with Vector Search and Vector Databases

    Our journey now takes an exciting turn with vector search and vector databases, the modern tools that help us navigate this unstructured data. These technologies transform our café into a futuristic spot where your coffee preference is understood on a deeper, more nuanced level.

    Vector Search: The Art of Finding Similarities

    Vector search operates like a seasoned traveler who understands the essence of every place visited. Text, images, or sounds can be transformed into numerical vectors, like unique coordinates on a map. The magic happens when these vectors are compared, revealing hidden similarities and connections, much like discovering that two seemingly different cities share a similar vibe.

    Vector Databases: Navigating Complex Data Landscapes

    Imagine a vast library of books where each book captures different aspects of a place along with its coordinates. Vector databases are akin to this library, designed to store and navigate these complex data points. They easily handle intricate queries over large datasets, making them perfect for our recommendation service, ensuring no blog worth reading remains undiscovered.

    Embeddings: Semantic Representation

    In our journey, embeddings are akin to a skilled artist who captures not just the visuals but the soul of a landscape. They map items like words or entire documents into real-number vectors, encapsulating their deeper meaning. This helps in understanding and comparing different pieces of content on a semantic level, letting the recommendation service show you things that really match your interests.

    Sample Project: Blog Recommendation Service

    Project Overview

    Now, let’s craft a simple blog recommendation service using OpenAI’s embedding APIs and Elasticsearch as a vector database. The goal is to recommend blogs similar to the current one the user is reading, which can be shown in the read more or recommendation section.

    Our blogs service will be responsible for indexing the blogs, finding similar one,  and interacting with the UI Service.

    Tools and Setup

    We will need the following tools to build our service:

    • OpenAI Account: We will be using OpenAI’s embedding API to generate the embeddings for our blog content. You will need an OpenAI account to use the APIs. Once you have created an account, please create an API key and store it in a secure location.
    • Elasticsearch: A popular database renowned for its full-text search capabilities, which can also be used as a vector database, adept at storing and querying complex embeddings with its dense_vector field type.
    • Docker: A tool that allows developers to package their applications and all the necessary dependencies into containers, ensuring that the application runs smoothly and consistently across different computing environments.
    • Python: A versatile programming language for developers across diverse fields, from web development to data science.

    The APIs will be created using the FastAPI framework, but you can choose any framework.

    Steps

    First, we’ll create a BlogItem class to represent each blog. It has only three fields, which will be enough for this demonstration, but real-world entities would have more details to accommodate a wider range of properties and functionalities.

    class BlogItem:
        blog_id: int
        title: str
        content: str
    
        def __init__(self, blog_id: int, title: str, content: str):
            self.blog_id = blog_id
            self.title = title
            self.content = content

    Elasticsearch Setup:

    • To store the blog data along with its embedding in Elasticsearch, we need to set up a local Elasticsearch cluster and then create an index for our blogs. You can also use a cloud-based version if you have already procured one for personal use.
    • Install Docker or Docker Desktop on your machine and create Elasticsearch and Kibana docker containers using the below docker compose file. Run the following command to create and start the services in the background:
    • docker compose -f /path/to/your/docker-compose/file up -d. 
    • You can exclude the file path if you are in the same directory as your docker-compose.yml file. The advantage of using docker compose is that it allows you to clean up these resources with just one command.
    • docker compose -f /path/to/your/docker-compose/file down.
    version: '3'
    
    services:
    
      elasticsearch:
        image: docker.elastic.co/elasticsearch/elasticsearch:<version>
        container_name: elasticsearch
        environment:
          - node.name=docker-cluster
          - discovery.type=single-node
          - cluster.routing.allocation.disk.threshold_enabled=false
          - bootstrap.memory_lock=true
          - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
          - xpack.security.enabled=true
          - ELASTIC_PASSWORD=YourElasticPassword
          - "ELASTICSEARCH_USERNAME=elastic"
        ulimits:
          memlock:
            soft: -1
            hard: -1
        volumes:
          - esdata:/usr/share/elasticsearch/data
        ports:
          - "9200:9200"
        networks:
          - esnet
    
      kibana:
        image: docker.elastic.co/kibana/kibana:<version>
        container_name: kibana
        environment:
          ELASTICSEARCH_HOSTS: http://elasticsearch:9200
          ELASTICSEARCH_USERNAME: elastic
          ELASTICSEARCH_PASSWORD: YourElasticPassword
        ports:
          - "5601:5601"
        networks:
          - esnet
        depends_on:
          - elasticsearch
    
    networks:
      esnet:
        driver: bridge
    
    volumes:
      esdata:
        driver: local

    • Connect to the local ES instance and create an index. Our “blogs” index will have a unique blog ID, blog title, blog content, and an embedding field to store the vector representation of blog content. The text-embedding-ada-002 model we have used here produces vectors with 1536 dimensions; hence, it’s important to use the same in our embeddings field in the blogs index.
    import logging
    from elasticsearch import Elasticsearch
    
    # Setting up logging
    logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(filename)s:%(lineno)d - %(message)s',
                        datefmt='%Y-%m-%d %H:%M:%S')
    # Elasticsearch client setup
    es = Elasticsearch(hosts="http://localhost:9200",
                       basic_auth=("elastic", "YourElasticPassword"))
    
    
    # Create an index with a mappings for embeddings
    def create_index(index_name: str, embedding_dimensions: int):
        try:
            es.indices.create(
                index=index_name,
                body={
                    'mappings': {
                        'properties': {
                            'blog_id': {'type': 'long'},
                            'title': {'type': 'text'},
                            'content': {'type': 'text'},
                            'embedding': {'type': 'dense_vector', 'dims': embedding_dimensions}
                        }
                    }
                }
            )
        except Exception as e:
            logging.error(f"An error occurred while creating index {index_name} : {e}")
    
    
    # Sample usage
    create_index("blogs", 1536)

    Create Embeddings AND Index Blogs:

    • We use OpenAI’s Embedding API to get a vector representation of our blog title and content. I am using the 002 model here, which is recommended by Open AI for most use cases. The input to the text-embedding-ada-002 should not exceed 8291 tokens (1000 tokens are roughly equal to 750 words) and cannot be empty.
    from openai import OpenAI, OpenAIError
    
    api_key = ‘YourApiKey’
    client = OpenAI(api_key=api_key)
    
    def create_embeddings(text: str, model: str = "text-embedding-ada-002") -> list[float]:
        try:
            text = text.replace("n", " ")
            response = client.embeddings.create(input=[text], model=model)
            logging.info(f"Embedding created successfully")
            return response.data[0].embedding
        except OpenAIError as e:
            logging.error(f"An OpenAI error occurred while creating embedding : {e}")
            raise
        except Exception as e:
            logging.exception(f"An unexpected error occurred while creating embedding  : {e}")
            raise

    • When the blogs get created or the content of the blog gets updated, we will call the create_embeddings function to get text embedding and store it in our blogs index.
    # Define the index name as a global constant
    
    ELASTICSEARCH_INDEX = 'blogs'
    
    def index_blog(blog_item: BlogItem):
        try:
            es.index(index=ELASTICSEARCH_INDEX, body={
                'blog_id': blog_item.blog_id,
                'title': blog_item.title,
                'content': blog_item.content,
                'embedding': create_embeddings(blog_item.title + "n" + blog_item.content)
            })
        except Exception as e:
            logging.error(f"Failed to index blog with blog id {blog_item.blog_id} : {e}")
            raise

    • Create a Pydantic model for the request body:
    class BlogItemRequest(BaseModel):
        title: str
        content: str

    • Create an API to save blogs to Elasticsearch. The UI Service would call this API when a new blog post gets created.
    @app.post("/blogs/")
    def save_blog(response: Response, blog_item: BlogItemRequest) -> dict[str, str]:
        # Create a BlogItem instance from the request data
        try:
            blog_id = get_blog_id()
            blog_item_obj = BlogItem(
                blog_id=blog_id,
                title=blog_item.title,
                content=blog_item.content,
            )
    
            # Call the index_blog method to index the blog
            index_blog(blog_item_obj)
            return {"message": "Blog indexed successfully", "blog_id": str(blog_id)}
        except Exception as e:
            logging.error(f"An error occurred while indexing blog with blog_id {blog_id} : {e}")
            response.status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
            return {"error": "Failed to index blog"}

    Finding Relevant Blogs:

    • To find blogs that are similar to the current one, we will compare the current blog’s vector representation with other blogs present in the Elasticsearch index using the cosine similarity function.
    • Cosine similarity is a mathematical measure used to determine the cosine of the angle between two vectors in a multi-dimensional space, often employed to assess the similarity between two documents or data points. 
    • The cosine similarity score ranges from -1 to 1. As the cosine similarity score increases from -1 to 1, it indicates an increasing degree of similarity between the vectors. Higher values represent greater similarity.
    • Create a custom exception to handle a scenario when a blog for a given ID is not present in Elasticsearch.
    class BlogNotFoundException(Exception):
        def __init__(self, message="Blog not found"):
            self.message = message
            super().__init__(self.message)

    • First, we will check if the current blog is present in the blogs index and get its embedding. This is done to prevent unnecessary calls to Open AI APIs as it consumes tokens. Then, we would construct an Elasticsearch dsl query to find the nearest neighbors and return their blog content.
    def get_blog_embedding(blog_id: int) -> Optional[Dict]:
        try:
            response = es.search(index=ELASTICSEARCH_INDEX, body={
                'query': {
                    'term': {
                        'blog_id': blog_id
                    }
                },
                '_source': ['title', 'content', 'embedding']  # Fetch title, content and embedding
            })
    
            if response['hits']['hits']:
                logging.info(f"Blog found with blog_id {blog_id}")
                return response['hits']['hits'][0]['_source']
            else:
                logging.info(f"No blog found with blog_id {blog_id}")
                return None
        except Exception as e:
            logging.error(f"Error occurred while searching for blog with blog_id {blog_id}: {e}")
            raise
    
    
    def find_similar_blog(current_blog_id: int, num_neighbors=2) -> list[dict[str, str]]:
        try:
            blog_data = get_blog_embedding(current_blog_id)
            if not blog_data:
                raise BlogNotFoundException(f"Blog not found for id:{current_blog_id}")
            blog_embedding = blog_data['embedding']
            if not blog_embedding:
                blog_embedding = create_embeddings(blog_data['title'] + 'n' + blog_data['content'])
            # Find similar blogs using the embedding
            response = es.search(index=ELASTICSEARCH_INDEX, body={
                'size': num_neighbors + 1,  # Retrieve extra result as we'll exclude the current blog
                '_source': ['title', 'content', 'blog_id', '_score'],
                'query': {
                    'bool': {
                        'must': {
                            'script_score': {
                                'query': {'match_all': {}},
                                'script': {
                                    'source': "cosineSimilarity(params.query_vector, 'embedding')",
                                    'params': {'query_vector': blog_embedding}
                                }
                            }
                        },
                        'must_not': {
                            'term': {
                                'blog_id': current_blog_id  # Exclude the current blog
                            }
                        }
                    }
                }
            })
    
            # Extract and return the hits
            hits = [
                {
                    'title': hit['_source']['title'],
                    'content': hit['_source']['content'],
                    'blog_id': hit['_source']['blog_id'],
                    'score': f"{hit['_score'] * 100:.2f}%"
                }
                for hit in response['hits']['hits']
                if hit['_source']['blog_id'] != current_blog_id
            ]
    
            return hits
        except Exception as e:
            logging.error(f"An error while finding similar blogs: {e}")
            raise

    • Define a Pydantic model for the response:
    class BlogRecommendation(BaseModel):
        blog_id: int
        title: str
        content: str
        score: str

    • Create an API that would be used by UI Service to find similar blogs as the current one user is reading:
    @app.get("/recommend-blogs/{current_blog_id}")
    def recommend_blogs(
            response: Response,
            current_blog_id: int,
            num_neighbors: Optional[int] = 2) -> Union[Dict[str, str], List[BlogRecommendation]]:
        try:
            # Call the find_similar_blog function to get recommended blogs
            recommended_blogs = find_similar_blog(current_blog_id, num_neighbors)
            return recommended_blogs
        except BlogNotFoundException as e:
            response.status_code = status.HTTP_400_BAD_REQUEST
            return {"error": f"Blog not found for id:{current_blog_id}"}
        except Exception as e:
            response.status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
            return {"error": "Unable to process the request"}

    • The below flow diagram summarizes all the steps we have discussed so far:

    Testing the Recommendation Service

    • Ideally, we would be receiving the blog ID from the UI Service and passing the recommendations back, but for illustration purposes, we’ll be calling the recommend blogs API with some test inputs from my test dataset. The blogs in this sample dataset have concise titles and content, which are sufficient for testing purposes, but real-world blogs will be much more detailed and have a significant amount of data. The test dataset has around 1000 blogs on various categories like healthcare, tech, travel, entertainment, and so on.
    • A sample from the test dataset:

     

    • Test Result 1: Medical Research Blog

      Input Blog: Blog_Id: 1, Title: Breakthrough in Heart Disease Treatment, Content: Researchers have developed a new treatment for heart disease that promises to be more effective and less invasive. This breakthrough could save millions of lives every year.


    • Test Result 2: Travel Blog

      Input Blog: Blog_Id: 4, Title: Travel Tips for Sustainable Tourism, Content: How to travel responsibly and sustainably.


    I manually tested multiple blogs from the test dataset of 1,000 blogs, representing distinct topics and content, and assessed the quality and relevance of the recommendations. The recommended blogs had scores in the range of 87% to 95%, and upon examination, the blogs often appeared very similar in content and style.

    Based on the test results, it’s evident that utilizing vector search enables us to effectively recommend blogs to users that are semantically similar. This approach ensures that the recommendations are contextually relevant, even when the blogs don’t share identical keywords, enhancing the user’s experience by connecting them with content that aligns more closely with their interests and search intent.

    Limitations

    This approach for finding similar blogs is good enough for our simple recommendation service, but it might have certain limitations in real-world applications.

    • Our similarity search returns the nearest k neighbors as recommendations, but there might be scenarios where no similar blog might exist or the neighbors might have significant score differences. To deal with this, you can set a threshold to filter out recommendations below a certain score. Experiment with different threshold values and observe their impact on recommendation quality. 
    • If your use case involves a small dataset and the relationships between user preferences and item features are straightforward and well-defined, traditional methods like content-based or collaborative filtering might be more efficient and effective than vector search.

    Further Improvements

    • Using LLM for Content Validation: Implement a verification step using large language models (LLMs) to assess the relevance and validity of recommended content. This approach can ensure that the suggestions are not only similar in context but also meaningful and appropriate for your audience.
    • Metadata-based Embeddings: Instead of generating embeddings from the entire blog content, utilize LLMs to extract key metadata such as themes, intent, tone, or key points. Create embeddings based on this extracted metadata, which can lead to more efficient and targeted recommendations, focusing on the core essence of the content rather than its entirety.

    Conclusion

    Our journey concludes here, but yours is just beginning. Armed with the knowledge of vector search, vector databases, and embeddings, you’re now ready to build a recommendation service that doesn’t just guide users to content but connects them to the stories, insights, and experiences they seek. It’s not just about building a service; it’s about enriching the digital exploration experience, one recommendation at a time.

  • Mage: Your New Go-To Tool for Data Orchestration

    In our journey to automate data pipelines, we’ve used tools like Apache Airflow, Dagster, and Prefect to manage complex workflows. However, as data automation continues to change, we’ve added a new tool to our toolkit: Mage AI.

    Mage AI isn’t just another tool; it’s a solution to the evolving demands of data automation. This blog aims to explain how Mage AI is changing the way we automate data pipelines by addressing challenges and introducing innovative features. Let’s explore this evolution, understand the problems we face, and see why we’ve adopted Mage AI.

    What is Mage AI?

    Mage is a user-friendly open-source framework created for transforming and merging data. It’s a valuable tool for developers handling substantial data volumes efficiently. At its heart, Mage relies on “data pipelines,” made up of code blocks. These blocks can run independently or as part of a larger pipeline. Together, these blocks form a structure known as a directed acyclic graph (DAG), which helps manage dependencies. For example, you can use Mage for tasks like loading data, making transformations, or exportation.

    Mage Architecture:

    Before we delve into Mage’s features, let’s take a look at how it works.

    When you use Mage, your request begins its journey in the Mage Server Container, which serves as the central hub for handling requests, processing data, and validation. Here, tasks like data processing and real-time interactions occur. The Scheduler Process ensures tasks are scheduled with precision, while Executor Containers, designed for specific tasks like Python or AWS, carry out the instructions.

    Mage’s scalability is impressive, allowing it to handle growing workloads effectively. It can expand both vertically and horizontally to maintain top-notch performance. Mage efficiently manages data, including code, data, and logs, and takes security seriously when handling databases and sensitive information. This well-coordinated system, combined with Mage’s scalability, guarantees reliable data pipelines, blending technical precision with seamless orchestration.

    Scaling Mage:

    To enhance Mage’s performance and reliability as your workload expands, it’s crucial to scale its architecture effectively. In this concise guide, we’ll concentrate on four key strategies for optimizing Mage’s scalability:

    1. Horizontal Scaling: Ensure responsiveness by running multiple Mage Server and Scheduler instances. This approach keeps the system running smoothly, even during peak usage.
    2. Multiple Executor Containers: Deploy several Executor Containers to handle concurrent task execution. Customize them for specific executors (e.g., Python, PySpark, or AWS) to scale task processing horizontally as needed.
    3. External Load Balancers: Utilize external load balancers to distribute client requests across Mage instances. This not only boosts performance but also ensures high availability by preventing overloading of a single server.
    4. Scaling for Larger Datasets: To efficiently handle larger datasets, consider:

    a. Allocating more resources to executors, empowering them to tackle complex data transformations.

    b. Mage supports direct data warehouse transformation and native Spark integration for massive datasets.

    Features: 

    1) Interactive Coding Experience

    Mage offers an interactive coding experience tailored for data preparation. Each block in the editor is a modular file that can be tested, reused, and chained together to create an executable data pipeline. This means you can build your data pipeline piece by piece, ensuring reliability and efficiency.

    2) UI/IDE for Building and Managing Data Pipelines

    Mage takes data pipeline development to the next level with a user-friendly integrated development environment (IDE). You can build and manage your data pipelines through an intuitive user interface, making the process efficient and accessible to both data scientists and engineers.

    3) Multiple Languages Support

    Mage supports writing pipelines in multiple languages such as Python, SQL, and R. This language versatility means you can work with the languages you’re most comfortable with, making your data preparation process more efficient.

    4) Multiple Types of Pipelines

    Mage caters to diverse data pipeline needs. Whether you require standard batch pipelines, data integration pipelines, streaming pipelines, Spark pipelines, or DBT pipelines, Mage has you covered.

    5) Built-In Engineering Best Practices

    Mage is not just a tool; it’s a promoter of good coding practices. It enables reusable code, data validation in each block, and operationalizes data pipelines with built-in observability, data quality monitoring, and lineage. This ensures that your data pipelines are not only efficient but also maintainable and reliable.

    6) Dynamic Blocks

    Dynamic blocks in Mage allow the output of a block to dynamically create additional blocks. These blocks are spawned at runtime, with the total number of blocks created being equal to the number of items in the output data of the dynamic block multiplied by the number of its downstream blocks.

    7) Triggers

    • Schedule Triggers: These triggers allow you to set specific start dates and intervals for pipeline runs. Choose from daily, weekly, or monthly, or even define custom schedules using Cron syntax. Mage’s Schedule Triggers put you in control of when your pipelines execute.
    • Event Triggers: With Event Triggers, your pipelines respond instantly to specific events, such as the completion of a database query or the creation of a new object in cloud storage services like Amazon S3 or Google Storage. Real-time automation at your fingertips.
    • API Triggers: API Triggers enable your pipelines to run in response to specific API calls. Whether it’s customer requests or external system interactions, these triggers ensure your data workflows stay synchronized with the digital world.

    Different types of Block: 

    Data Loader: Within Mage, Data Loaders are ready-made templates designed to seamlessly link up with a multitude of data sources. These sources span from Postgres, Bigquery, Redshift, and S3 to various others. Additionally, Mage allows for the creation of custom data loaders, enabling connections to APIs. The primary role of Data Loaders is to facilitate the retrieval of data from these designated sources.

    Data Transformer: Much like Data Loaders, Data Transformers provide predefined functions such as handling duplicates, managing missing data, and excluding specific columns. Alternatively, you can craft your own data transformations or merge outputs from multiple data loaders to preprocess and sanitize the data before it advances through the pipeline.

    Data Exporter: Data Exporters within Mage empower you to dispatch data to a diverse array of destinations, including databases, data lakes, data warehouses, or local storage. You can opt for predefined export templates or craft custom exporters tailored to your precise requirements.

    Custom Blocks: Custom blocks in the Mage framework are incredibly flexible and serve various purposes. They can store configuration data and facilitate its transmission across different pipeline stages. Additionally, they prove invaluable for logging purposes, allowing you to categorize and visually distinguish log entries for enhanced organization.

    Sensor: A Sensor, a specialized block within Mage, continuously assesses a condition until it’s met or until a specified time duration has passed. When a block depends on a sensor, it remains inactive until the sensor confirms that its condition has been satisfied. Sensors are especially valuable when there’s a need to wait for external dependencies or handle delayed data before proceeding with downstream tasks.

    Getting Started with Mage

    There are two ways to run mage, either using docker or using pip:
    Docker Command

    Create a new working directory where all the mage files will be stored.

    Then, in that working directory, execute this command:

    Windows CMD: 

    docker run -it -p 6789:6789 -v %cd%:/home/src mageai/mageai /app/run_app.sh mage start [project_name]

    Linux CMD:

    docker run -it -p 6789:6789 -v $(pwd):/home/src mageai/mageai /app/run_app.sh mage start [project_name]

    Using Pip (Working directory):

    pip install mage-ai

    Mage start [project_name]

    You can browse to http://localhost:6789/overview to get to the Mage UI.

    Let’s build our first pipelineto fetch CSV files from the API for data loading, do some useful transformations, and export that data to our local database.

    Dataset invoices CSV files stored in the current directory of columns:

     (1) First Name; (2) Last Name; (3) E-mail; (4) Product ID; (5) Quantity; (6) Amount; (7) Invoice Date; (8) Address; (9) City; (10) Stock Code

    Create a new pipeline and select a standard batch (we’ll be implementing a batch pipeline) from the dashboard and give it a unique ID.

    Project structure:

    ├── mage_data

    └── [project_name]

        ├── charts

        ├── custom

        ├── data_exporters

        ├── data_loaders

        ├── dbt

        ├── extensions

        ├── pipelines

        │   └── [pipeline_name]

        │       ├── __init__.py

        │       └── metadata.yaml

        ├── scratchpads

        ├── transformers

        ├── utils

        ├── __init__.py

        ├── io_config.yaml

        ├── metadata.yaml

        └── requirements.txt

    This pipeline consists of all the block files, including data loader, transformer, charts, and configuration files for our pipeline io_config.yaml and metadata.yaml files. All block files will contain decorators’ inbuilt function where we will be writing our code.

    1. We begin by loading a CSV file from our local directory, specifically located at /home/src/invoice.csv. To achieve this, we select the “Local File” option from the Templates dropdown and configure the Data Loader block accordingly. Running this configuration will allow us to confirm if the CSV file loads successfully.

    2. In the next step, we introduce a Transformer block using a generic template. On the right side of the user interface, we can observe the directed acyclic graph (DAG) tree. To establish the data flow, we edit the parent of the Transformer block, linking it either directly to the Data Loader block or via the user interface.

    The Transformer block operates on the data frame received from the upstream Data Loader block, which is passed as the first argument to the Transformer function.

    3. Our final step involves exporting the DataFrame to a locally hosted PostgreSQL database. We incorporate a Data Export block and connect it to the Transformer block.

    To establish a connection with the PostgreSQL database, it is imperative to configure the database credentials in the io_config.yaml file. Alternatively, these credentials can be added to environmental variables.

    With these steps completed, we have successfully constructed a foundational batch pipeline. This pipeline efficiently loads, transforms, and exports data, serving as a fundamental building block for more advanced data processing tasks.

    Mage vs Other tools:

    Consistency Across Environments: Some orchestration tools may exhibit inconsistencies between local development and production environments due to varying configurations. Mage tackles this challenge by providing a consistent and reproducible workflow environment through a single configuration file that can be executed uniformly across different environments.

    Reusability: Achieving reusability in workflows can be complex in some tools. Mage simplifies this by allowing tasks and workflows to be defined as reusable components within a Magefile, making it effortless to share them across projects and teams.

    Data Passing: Efficiently passing data between tasks can be a challenge in certain tools, especially when dealing with large datasets. Mage streamlines data passing through straightforward function arguments and returns, enabling seamless data flow and versatile data handling.

    Testing: Some tools lack user-friendly testing utilities, resulting in manual testing and potential coverage gaps. Mage simplifies testing with a robust testing framework that enables the definition of test cases, inputs, and expected outputs directly within the Mage file.

    Debugging: Debugging failed tasks can be time-consuming with certain tools. Mage enhances debugging with detailed logs and error messages, offering clear insights into the causes of failures and expediting issue resolution.

    Conclusion: 

    Mage offers a streamlined and user-friendly approach to data pipeline orchestration, addressing common challenges with simplicity and efficiency. Its single-container deployment, visual interface, and robust features make it a valuable tool for data professionals seeking an intuitive and consistent solution for managing data workflows.

  • Parallelizing Heavy Read and Write Queries to SQL Datastores using Spark and more!

    The amount of data in our world has been exploding exponentially day by day. Processing and analyzing this Big Data has become key in the current age to make informed, data-driven decisions. Spark is a unified distributed data processing engine used for Big Data. Spark can be used to process Big Data in an efficient manner. Spark lets you process Big Data faster by splitting the work into chunks and assigning those chunks to computation resources across nodes. It can handle up to petabytes of data, which is millions of gigabytes of data. It processes all its data in memory, which makes it faster.

    We talked about processing Big Data in Spark, but we know spark doesn’t store any data like other file systems. So, to process data in Spark, we must read data from different data sources, clean or process the data, and again store this data in one of the target data sources. Data sources can be files, APIs, databases, or streams. 

    Database management systems have been present for a decade. Many applications generate huge amounts of data and store data in database management systems. And a lot of times, we need to connect spark to the database and process that data.

    In this blog, we are going to discuss how to use spark to read from and write to databases in parallel. Our focus will be on reading/writing data from/to the database using different methods, which will help us read/write TeraBytes of data in an efficient manner.

    Reading / Writing data from/to Database using Spark:

    To read data or write data from/to the database, we will need to perform a few basic steps regardless of any programming language or framework we are using. What follows is an overview of the steps to read data from databases.

    Step 1: Register Driver or Use Connector

    Get the respective driver of your database and register the driver, or use the connector to connect to the database.

    Step 2: Make a connection

    Next, the driver or connector makes a connection to the database.

    Step 3: Run query statement

    Using the connection created in the previous step, execute the query, which will return the result.

    Step 4: Process result

    For the result, we got in the previous step, process it as per your requirement.

    Step 5: Close the connection

    Dataset we are using:

    Covid data

    This dataset contains details of COVID patients across all states. It has different information such as State, Confirmed, Recovered, Deceased, Other, Tested, and Date.

    You can load this dataset in any of the databases you work with and can try out the entire discussion practically.

    The following image shows ten records of the entire dataset.

    Single-partition Spark program:

    ## Creating a spark session and adding Postgres Driver to spark.
    from pyspark.sql import SparkSession
    
    ## Creating spark session and adding Postgres Driver to spark.
    spark_session = SparkSession.builder 
        .master("local") 
        .appName("Databases") 
        .config("spark.jars.packages", "org.postgresql:postgresql:42.2.8") 
        .getOrCreate()
    
    hostname = "localhost",
    jdbc_port = 5432,
    dbname = "aniket",
    username = "postgres",
    password = "pass@123"
    
    jdbc_url = "jdbc:postgresql://{0}:{1}/{2}".format(hostname, jdbc_port, dbname)
    
    ## reading data
    table_data_df = spark_session.read 
        .format("jdbc") 
        .option("url", jdbc_url) 
        .option("dbtable", "aniket") 
        .option("user", username) 
        .option("password", password) 
        .option("driver", "org.postgresql.Driver") 
        .load()
    
    ## writing data
    table_data_df.write 
        .format("jdbc") 
        .option("url", jdbc_url) 
        .option("dbtable", "spark_schema.zipcode_table") 
        .option("user", username) 
        .option("password", password) 
        .option("driver", "org.postgresql.Driver") 
        .save()

    Spark provides an API to read data from a database and is very simple to use. First of all, we will need to create a Spark session. Then add the driver to Spark. It can be added through the program itself, or we can add it using shell also.

    The first line of code imports the SparkSession class. This is the entry point to programming Spark with the Dataset and DataFrame API

    From the fifth to the ninth line of the above code, we are creating a spark session on a local system with four cores, which will be used for interaction with our spark application. We specify the name for our application using appName(), which in our case, is ‘Databases.’ This app name will be shown on Webb UI for our cluster. Next, we can specify any configurations for the spark application using config(). In our case, we have specified the configuration of the driver for the Postgres database, which will be used to create a connection with the Postgres database. You can specify the driver of any of the available databases. 

    To connect to the database, we must have a hostname, port, database name, username, and password with us. Those details are in 10 through 16 lines of the above code.

    Refer to the code lines from 19 to 28 in the above snippet. Up until now, we have had our Spark session and all the information that we need to connect to the database. Using the Spark Read API, we read the data from the database. This will create a connection to the Postgres database from one of the cores that we have allocated for the Spark application. And using this connection, it will read the data into the table_data_df dataframe. Even if we have multiple cores for our application, it will still create only one connection from one of the cores. The rest of the cores will not be utilized. While we will discuss how to utilize all cores, our first focus is here.

    Refer to the code lines from 29 to 38 in the above snippet. We have the data now, so let’s try to write it to the database. Using the Spark Write API, we will write data to the database. This will also create only one connection to the database from one of the cores that we have allocated for the Spark application. Even if we have more cores for the application, it still uses only one core with the above code.

    Output of Program:

    /usr/bin/python3.8 /home/aniketrajput/aniket_work/Spark/main.py
    WARNING: An illegal reflective access operation has occurred
    WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/aniketrajput/.local/lib/python3.8/site-packages/pyspark/jars/spark-unsafe_2.12-3.2.1.jar) to constructor java.nio.DirectByteBuffer(long,int)
    WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
    WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
    WARNING: All illegal access operations will be denied in a future release
    :: loading settings :: url = jar:file:/home/aniketrajput/.local/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
    Ivy Default Cache set to: /home/aniketrajput/.ivy2/cache
    The jars for the packages stored in: /home/aniketrajput/.ivy2/jars
    org.postgresql#postgresql added as a dependency
    :: resolving dependencies :: org.apache.spark#spark-submit-parent-83662a49-0573-46c3-be8e-0a280f96c7d8;1.0
    	confs: [default]
    	found org.postgresql#postgresql;42.2.8 in central
    :: resolution report :: resolve 113ms :: artifacts dl 3ms
    	:: modules in use:
    	org.postgresql#postgresql;42.2.8 from central in [default]
    	---------------------------------------------------------------------
    	|                  |            modules            ||   artifacts   |
    	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    	---------------------------------------------------------------------
    	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
    	---------------------------------------------------------------------
    :: retrieving :: org.apache.spark#spark-submit-parent-83662a49-0573-46c3-be8e-0a280f96c7d8
    	confs: [default]
    	0 artifacts copied, 1 already retrieved (0kB/5ms)
    22/04/22 11:55:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    +-------------+-----------------+---------+---------+--------+-----+------+----------+
    |        state|         district|confirmed|recovered|deceased|other|tested|      date|
    +-------------+-----------------+---------+---------+--------+-----+------+----------+
    |Uttar Pradesh|         Varanasi|    23512|    23010|     456|    0|595510|2021-02-24|
    |  Uttarakhand|           Almora|     3259|     3081|      25|  127| 84443|2021-02-24|
    |  Uttarakhand|        Bageshwar|     1534|     1488|      17|   26| 55626|2021-02-24|
    |  Uttarakhand|          Chamoli|     3486|     3373|      15|   88| 90390|2021-02-24|
    |  Uttarakhand|        Champawat|     1819|     1790|       9|    7| 95068|2021-02-24|
    |  Uttarakhand|         Dehradun|    29619|    28152|     962|  439|401496|2021-02-24|
    |  Uttarakhand|         Haridwar|    14137|    13697|     158|  175|369542|2021-02-24|
    |  Uttarakhand|         Nainital|    12636|    12254|     237|   79|204422|2021-02-24|
    |  Uttarakhand|    Pauri Garhwal|     5145|     5033|      60|   24|138878|2021-02-24|
    |  Uttarakhand|      Pithoragarh|     3361|     3291|      47|   11| 72686|2021-02-24|
    |  Uttarakhand|      Rudraprayag|     2270|     2251|      10|    7| 52378|2021-02-24|
    |  Uttarakhand|    Tehri Garhwal|     4227|     4026|      16|  170|105111|2021-02-24|
    |  Uttarakhand|Udham Singh Nagar|    11538|    11267|     117|  123|337292|2021-02-24|
    |  Uttarakhand|       Uttarkashi|     3789|     3645|      17|  118|120026|2021-02-24|
    |  West Bengal|       Alipurduar|     7705|     7616|      86|    0|  null|2021-02-24|
    |  West Bengal|          Bankura|    11940|    11788|      92|    0|  null|2021-02-24|
    |  West Bengal|          Birbhum|    10035|     9876|      89|    0|  null|2021-02-24|
    |  West Bengal|      Cooch Behar|    11835|    11756|      72|    0|  null|2021-02-24|
    |  West Bengal| Dakshin Dinajpur|     8179|     8099|      74|    0|  null|2021-02-24|
    |  West Bengal|       Darjeeling|    18423|    18155|     203|    0|  null|2021-02-24|
    +-------------+-----------------+---------+---------+--------+-----+------+----------+
    only showing top 20 rows
    
    
    Process finished with exit code 0

    Multiple Partition spark program:

    from pyspark.sql import SparkSession
    
    ## Creating a spark session and adding Postgres Driver to spark.
    spark_session = SparkSession.builder 
        .master("local[4]") 
        .appName("Databases") 
        .config("spark.jars.packages", "org.postgresql:postgresql:42.2.8")
        .getOrCreate()
    
    hostname = "localhost"
    jdbc_port = 5432
    dbname = "postgres"
    username = "postgres"
    password = "pass@123"
    
    jdbc_url = "jdbc:postgresql://{0}:{1}/{2}".format(hostname, jdbc_port, dbname)
    
    partition_column = 'date'
    lower_bound = '2021-02-20'
    upper_bound = '2021-02-28'
    num_partitions = 4
    
    ## reading data
    table_data_df = spark_session.read 
        .format("jdbc") 
        .option("url", jdbc_url) 
        .option("dbtable", "covid_data") 
        .option("user", username) 
        .option("password", password) 
        .option("driver", "org.postgresql.Driver") 
        .option("partitionColumn", partition_column) 
        .option("lowerBound", lower_bound) 
        .option("upperBound", upper_bound) 
        .option("numPartitions", num_partitions) 
        .load()
    
    table_data_df.show()
    
    ## writing data
    table_data_df.write 
        .format("jdbc") 
        .option("url", jdbc_url) 
        .option("dbtable", "covid_data_output") 
        .option("user", username) 
        .option("password", password) 
        .option("driver", "org.postgresql.Driver") 
        .option("numPartitions", num_partitions) 
        .save()

    As promised in the last section, we will discuss how we can optimize for resource utilization. In the last section, we had only one connection, utilizing very limited resources and causing resources to be idle or unused. To get over this, the Spark Read and Write API has a way by providing a few extra attributes. And those are partitionColumn, lowerBound, upperBound. These options must all be specified if any of them is specified. In addition, numPartitions must be specified. They describe how to partition the table when reading in parallel from multiple workers. For each partition, there will be an individual core with its own connection performing the reads or writes. Thus, making the database operation in parallel.

    This is an efficient way of reading and writing data from databases in spark rather than just doing it with one partition. 

    Partitions are decided by the Spark API in the following way.

    Let’s consider an example where:

    lowerBound: 0

    upperBound: 1000

    numPartitions: 10

    Stride is equal to 100, and partitions correspond to the following queries:

    SELECT * FROM table WHERE partitionColumn BETWEEN 0 AND 100 

    SELECT * FROM table WHERE partitionColumn BETWEEN 100 AND 200 

    SELECT * FROM table WHERE partitionColumn > 9000

    BETWEEN here is exclusive on the upper bound.

    Now we have data in multiple partitions. Each executor can have one or more partitions based on cluster configuration. Suppose we have 10 cores and 10 partitions. One partition of data can be fetched from one executor using one core. So, 10 partitions of data can be fetched from 10 executors. Each of these executors will create the connection to the database and will read the data.

    Note– lowerbound and upperbound does not filter the data. It just helps spark to decide the stride of data.

         partitionColumn must be a numeric, date, or timestamp column from the table

    Also, there are some attributes that can be used during the write operation to optimize the write operation. One of the attributes is “batchsize”. The JDBC batch size, which determines how many rows to insert per round trip. This can help the performance of JDBC drivers. This option applies only to writing. One more attribute called “truncate” can be helpful to optimize the write operation. This is a JDBC writer-related option. When SaveMode.Overwrite is enabled, it causes Spark to truncate an existing table instead of dropping and recreating it. This can be more efficient and prevents the table metadata (e.g., indices) from being removed.

    Output of Program:

    /usr/bin/python3.8 /home/aniketrajput/aniket_work/Spark/main.py
    WARNING: An illegal reflective access operation has occurred
    WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/aniketrajput/.local/lib/python3.8/site-packages/pyspark/jars/spark-unsafe_2.12-3.2.1.jar) to constructor java.nio.DirectByteBuffer(long,int)
    WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
    WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
    WARNING: All illegal access operations will be denied in a future release
    :: loading settings :: url = jar:file:/home/aniketrajput/.local/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
    Ivy Default Cache set to: /home/aniketrajput/.ivy2/cache
    The jars for the packages stored in: /home/aniketrajput/.ivy2/jars
    org.postgresql#postgresql added as a dependency
    :: resolving dependencies :: org.apache.spark#spark-submit-parent-8047b5cc-11e8-4efb-8a38-70edab0d5404;1.0
    	confs: [default]
    	found org.postgresql#postgresql;42.2.8 in central
    :: resolution report :: resolve 104ms :: artifacts dl 3ms
    	:: modules in use:
    	org.postgresql#postgresql;42.2.8 from central in [default]
    	---------------------------------------------------------------------
    	|                  |            modules            ||   artifacts   |
    	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    	---------------------------------------------------------------------
    	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
    	---------------------------------------------------------------------
    :: retrieving :: org.apache.spark#spark-submit-parent-8047b5cc-11e8-4efb-8a38-70edab0d5404
    	confs: [default]
    	0 artifacts copied, 1 already retrieved (0kB/4ms)
    22/04/22 12:20:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    +-------------+-----------------+---------+---------+--------+-----+------+----------+
    |        state|         district|confirmed|recovered|deceased|other|tested|      date|
    +-------------+-----------------+---------+---------+--------+-----+------+----------+
    |Uttar Pradesh|         Varanasi|    23512|    23010|     456|    0|595510|2021-02-24|
    |  Uttarakhand|           Almora|     3259|     3081|      25|  127| 84443|2021-02-24|
    |  Uttarakhand|        Bageshwar|     1534|     1488|      17|   26| 55626|2021-02-24|
    |  Uttarakhand|          Chamoli|     3486|     3373|      15|   88| 90390|2021-02-24|
    |  Uttarakhand|        Champawat|     1819|     1790|       9|    7| 95068|2021-02-24|
    |  Uttarakhand|         Dehradun|    29619|    28152|     962|  439|401496|2021-02-24|
    |  Uttarakhand|         Haridwar|    14137|    13697|     158|  175|369542|2021-02-24|
    |  Uttarakhand|         Nainital|    12636|    12254|     237|   79|204422|2021-02-24|
    |  Uttarakhand|    Pauri Garhwal|     5145|     5033|      60|   24|138878|2021-02-24|
    |  Uttarakhand|      Pithoragarh|     3361|     3291|      47|   11| 72686|2021-02-24|
    |  Uttarakhand|      Rudraprayag|     2270|     2251|      10|    7| 52378|2021-02-24|
    |  Uttarakhand|    Tehri Garhwal|     4227|     4026|      16|  170|105111|2021-02-24|
    |  Uttarakhand|Udham Singh Nagar|    11538|    11267|     117|  123|337292|2021-02-24|
    |  Uttarakhand|       Uttarkashi|     3789|     3645|      17|  118|120026|2021-02-24|
    |  West Bengal|       Alipurduar|     7705|     7616|      86|    0|  null|2021-02-24|
    |  West Bengal|          Bankura|    11940|    11788|      92|    0|  null|2021-02-24|
    |  West Bengal|          Birbhum|    10035|     9876|      89|    0|  null|2021-02-24|
    |  West Bengal|      Cooch Behar|    11835|    11756|      72|    0|  null|2021-02-24|
    |  West Bengal| Dakshin Dinajpur|     8179|     8099|      74|    0|  null|2021-02-24|
    |  West Bengal|       Darjeeling|    18423|    18155|     203|    0|  null|2021-02-24|
    +-------------+-----------------+---------+---------+--------+-----+------+----------+
    only showing top 20 rows
    
    
    Process finished with exit code 0

    We have seen how to read and write data in Spark. Spark is not the only way to connect with databases, right? There are multiple ways we can access databases and try to achieve parallel read-writes. We will discuss this in further sections. We will mainly focus on reading and writing it from python.

    Single Thread Python Program: 

    import traceback
    import psycopg2
    import pandas as pd
    
    class PostgresDbClient:
        def __init__(self, postgres_hostname, postgres_jdbcport, postgres_dbname, username, password):
            self.db_host = postgres_hostname
            self.db_port = postgres_jdbcport
            self.db_name = postgres_dbname
            self.db_user = username
            self.db_pass = password
    
        def create_conn(self):
            conn = None
            try:
                print('Connecting to the Postgres database...')
                conn = psycopg2.connect("dbname={} user={} host={} password={} port={}".format(self.db_name, self.db_user, self.db_host, self.db_pass, self.db_port))
                print('Successfully connected to the Postgres database...')
            except Exception as e:
                print("Cannot connect to Postgres.")
                print(f'Error: {str(e)}nTrace: {traceback.format_exc()}')
            return conn
    
        def read(self, query):
            try:
                conn = self.create_conn()
                cursor = conn.cursor()
                print(f"Reading data !!!")
                cursor.execute(query)
                data = cursor.fetchall()
                print(f"Read Data !!!")
                cursor.close()
                conn.close()
                return data
            except Exception as e:
                print(f'Error: {str(e)}nTrace: {traceback.format_exc()}')
    
    
    if __name__ == "__main__":
        hostname = "localhost"
        jdbc_port = 5432
        dbname = "postgres"
        username = "postgres"
        password = "pass@123"
        table_name = "covid_data"
        query = f"select * from {table_name}"
    
        db_client = PostgresDbClient(postgres_hostname=hostname, postgres_jdbcport=jdbc_port, postgres_dbname=dbname, username=username,password=password)
        data = pd.DataFrame(db_client.read(query))
        print(data)

    To integrate Postgres with Python, we have different libraries or adopters that we can use. But Psycopg is the widely used adopter. First off all, you will need to install the Psycopg2 library. Psycopg2 is a slightly updated version of the Psycopg adapter. You install it using pip or any way you are comfortable with.

    To connect with the Postgres database, we need hostname, port, database name, username, and password. We are storing all these details as attributes in class. The create connection method will form a connection with the Postgres database using the connect() method of psycopg2 module. This method will return a connection object.
    In the read method, we call this connection method and get a connection object. Using this connection object, we create a cursor. This cursor is bound to have a connection with the database for its lifetime and execute all the commands or queries on the database. Using this query object, we execute a read query on the database. Then the data returned by the executing read query can be fetched using the fetchall() method. Then we close the connection.

    To run the program, we have specified details of database and query. Next, we create an object of PostgresDbClient and call the read method from class PostgresDbClient. This read method will return as data and we are converting this data into relational format using pandas.

    This implementation is very straightforward: this program creates one process in our system and fetches all the data using system resources, CPU, memory, etc. The drawback of this approach is that suppose this program uses 30 percent CPU and memory resources out of 100%, then the remaining 70% of resources are idle. We can maximize this usage by other means like multithreading or multiprocessing. 

    Output of Program:

    Connecting to the Postgres database...
    Successfully connected to the Postgres database...
    Reading data !!!
    Read Data !!!
                                  0              1    2   3  4  5     6           7
    0   Andaman and Nicobar Islands        Unknown   33  11  0  0  None  2020-04-26
    1                Andhra Pradesh      Anantapur   53  14  4  0  None  2020-04-26
    2                Andhra Pradesh       Chittoor   73  13  0  0  None  2020-04-26
    3                Andhra Pradesh  East Godavari   39  12  0  0  None  2020-04-26
    4                Andhra Pradesh         Guntur  214  29  8  0  None  2020-04-26
    ..                          ...            ...  ...  .. .. ..   ...         ...
    95                        Bihar         Araria    1   0  0  0  None  2020-04-30
    96                        Bihar          Arwal    4   0  0  0  None  2020-04-30
    97                        Bihar     Aurangabad    8   0  0  0  None  2020-04-30
    98                        Bihar          Banka    3   0  0  0  None  2020-04-30
    99                        Bihar      Begusarai   11   5  0  0  None  2020-04-30
    
    [100 rows x 8 columns]
    
    Process finished with exit code 0

    Multi Thread python program:

    In the previous section, we discussed the drawback of a single process and single-thread implementation. Let’s get started with how to maximize resource usage. Before getting into multithreading, let’s understand a few basic but important concepts.

    What is a process?

    When you execute any program, the operating system loads it in memory and then starts executing the program. This instance of the program being executed is called a process. Computing and memory resources are associated with each process separately.

    What is a thread?

    A thread is a sequential flow of execution. A process is also a thread. Usually, the process is called a main thread. Unlike a process, the same computing and memory resources can be shared with multiple threads.

    What is multithreading?

    This is when a process has multiple threads, along with the main thread, and these threads run independently but concurrently using the same computing and memory resources associated with the process. Such a program is called a multithreaded program or process. Multithreading uses resources very efficiently, which results in maximizing performance.

    What is multiprocessing?

    When multiple processes run independently, with separate resources associated with each process, it is called multiprocessing. Multiprocessing is achieved with multiple processors running separate processes on each processor. 

    Let’s get back to our program. Here you can see we have a connection and read method. These two methods are exactly the same as from the previous section. Here, we have one new function, which is get_thread(). Be careful, as a method belongs to the class, and afunction, it is not part of this class. So, this get_thred() function is global and acts as a wrapper function for calling the read method from the class PostgresDbClient. This is because we can’t create threads using class methods. Don’t get confused if you don’t understand it, as it is just how we write the code.

    To run the program, we have specified the Postgres database details and queries. In the previous approach, we fetched all the data from the table with one thread only. In this approach, the plan is to fetch one day of data using one thread so that we can maximize resource utilization. Here, each query reads one day’s worth of data from the table using one thread. Having 5 queries will fetch 5 days of data, and 5 threads will be running concurrently.

    To create a thread in Python, we will need to use the Thread() method from the threading library. We need to pass the function that we want to run and arguments of that function. The thread() object will create a new thread and return its object. The thread has been created but has not yet started. To start this thread, we will need to use the start() method. In our program, we are starting 5 threads. If you try executing this entire program multiple times, you will end up with different results. Some data will fetch prior, and some will fetch later. And at the time of the next execution, this order will be different again. This is because resource handling is done by the operating system. Depending on what the OS thinks about which thread to give what resources, the output is generated. If you want to know how this is done, you will need to go deep into operating systems concepts.

    In our use case, we are just printing the data to the console. To store the data, there are multiple ways. One simple way is to define the global variable and store the result in it, but we will need to achieve synchronization as multiple threads might access the global variable, which can lead to race conditions. Another way is to extend the thread class to your custom class, and you can define a class variable—and you can use this variable to save the data. Again, here, you will need to make sure you are achieving synchronization.

    So, whenever you want to store the data in a variable by any available method, you will need to achieve synchronization. So, synchronization will lead to the sequential execution of threads. And this sequential processing is not what we are looking for.
    To avoid synchronization, we can directly write the data to the target—so that when the thread reads the data, the same thread will write data again back to the target database. This way, we can avoid synchronization and store the data in the database for future use. This function can look as below, where db_client.write(data) is a function that writes the data to a database.

    def get_thread(thread_id, db_client, query):

        print(f”Starting thread id {thread_id}”)

        data = pd.DataFrame(db_client.read(query))

        print(f”Thread {thread_id} data “, data, sep=”n”)

        db_client.write(data)

        print(f”Stopping thread id {thread_id}”)

    Python Program:

    import threading
    import traceback
    import psycopg2
    import pandas as pd
    
    class PostgresDbClient:
        def __init__(self, postgres_hostname, postgres_jdbcport, postgres_dbname, username, password):
            self.db_host = postgres_hostname
            self.db_port = postgres_jdbcport
            self.db_name = postgres_dbname
            self.db_user = username
            self.db_pass = password
    
        def create_conn(self):
            conn = None
            try:
                print('Connecting to the Postgres database...')
                conn = psycopg2.connect("dbname={} user={} host={} password={} port={}".format(self.db_name, self.db_user, self.db_host, self.db_pass, self.db_port))
                print('Successfully connected to the Postgres database...')
            except Exception as e:
                print("Cannot connect to Postgres.")
                print(f'Error: {str(e)}nTrace: {traceback.format_exc()}')
            return conn
    
        def read(self, query):
            try:
                conn = self.create_conn()
                cursor = conn.cursor()
                print(f"Reading data !!!")
                cursor.execute(query)
                data = cursor.fetchall()
                print(f"Read Data !!!")
                cursor.close()
                conn.close()
                return data
            except Exception as e:
                print(f'Error: {str(e)}nTrace: {traceback.format_exc()}')
    
    
    def get_thread(thread_id, db_client, query):
        print(f"Starting thread id {thread_id}")
        data = pd.DataFrame(db_client.read(query))
        print(f"Thread {thread_id} data ", data, sep="n")
        print(f"Stopping thread id {thread_id}")
    
    
    if __name__ == "__main__":
        hostname = "localhost"
        jdbc_port = 5432
        dbname = "postgres"
        username = "postgres"
        password = "pass@123"
        table_name = "covid_data"
        query = f"select * from {table_name}"
    
        partition_column = 'date'
        lower_bound = '2020-04-26'
        upper_bound = '2020-04-30'
        num_partitions = 5
    
    
        query1 = f"select * from {table_name} where {partition_column} >= '{lower_bound}' and {partition_column} < '{upper_bound}'"
        query2 = f"select * from {table_name} where {partition_column} >= '{lower_bound}' and {partition_column} < '{upper_bound}'"
        query3 = f"select * from {table_name} where {partition_column} >= '{lower_bound}' and {partition_column} < '{upper_bound}'"
        query4 = f"select * from {table_name} where {partition_column} >= '{lower_bound}' and {partition_column} < '{upper_bound}'"
        query5 = f"select * from {table_name} where {partition_column} >= '{lower_bound}' and {partition_column} < '{upper_bound}'"
    
        db_client = PostgresDbClient(postgres_hostname=hostname, postgres_jdbcport=jdbc_port, postgres_dbname=dbname, username=username,password=password)
        x1 = threading.Thread(target=get_thread, args=(1, db_client, query1))
        x1.start()
        x2 = threading.Thread(target=get_thread, args=(2, db_client, query2))
        x2.start()
        x3 = threading.Thread(target=get_thread, args=(3, db_client, query3))
        x3.start()
        x4 = threading.Thread(target=get_thread, args=(4, db_client, query4))
        x4.start()
        x5 = threading.Thread(target=get_thread, args=(5, db_client, query5))
        x5.start()

    Output of Program:

    Starting thread id 1
    Connecting to the Postgres database...
    Starting thread id 2
    Connecting to the Postgres database...
    Starting thread id 3
    Connecting to the Postgres database...
    Starting thread id 4
    Connecting to the Postgres database...
    Starting thread id 5
    Connecting to the Postgres database...
    Successfully connected to the Postgres database...Successfully connected to the Postgres database...Successfully connected to the Postgres database...
    Reading data !!!
    
    Reading data !!!
    
    Reading data !!!
    Successfully connected to the Postgres database...
    Reading data !!!
    Successfully connected to the Postgres database...
    Reading data !!!
    Read Data !!!
    Read Data !!!
    Read Data !!!
    Read Data !!!
    Read Data !!!
    Thread 2 data 
    Thread 3 data 
    Thread 1 data 
    Thread 5 data 
    Thread 4 data 
                                  0               1    2  ...  5     6           7
    0   Andaman and Nicobar Islands         Unknown   33  ...  0  None  2020-04-27
    1                Andhra Pradesh       Anantapur   53  ...  0  None  2020-04-27
    2                Andhra Pradesh        Chittoor   73  ...  0  None  2020-04-27
    3                Andhra Pradesh   East Godavari   39  ...  0  None  2020-04-27
    4                Andhra Pradesh          Guntur  237  ...  0  None  2020-04-27
    5                Andhra Pradesh         Krishna  210  ...  0  None  2020-04-27
    6                Andhra Pradesh         Kurnool  292  ...  0  None  2020-04-27
    7                Andhra Pradesh        Prakasam   56  ...  0  None  2020-04-27
    8                Andhra Pradesh  S.P.S. Nellore   79  ...  0  None  2020-04-27
    9                Andhra Pradesh      Srikakulam    4  ...  0  None  2020-04-27
    10               Andhra Pradesh   Visakhapatnam   22  ...  0  None  2020-04-27
    11               Andhra Pradesh   West Godavari   54  ...  0  None  2020-04-27
    12               Andhra Pradesh   Y.S.R. Kadapa   58  ...  0  None  2020-04-27
    13            Arunachal Pradesh           Lohit    1  ...  0  None  2020-04-27
    14                        Assam         Unknown   36  ...  0  None  2020-04-27
    15                        Bihar           Arwal    4  ...  0  None  2020-04-27
    16                        Bihar      Aurangabad    7  ...  0  None  2020-04-27
    17                        Bihar           Banka    2  ...  0  None  2020-04-27
    18                        Bihar       Begusarai    9  ...  0  None  2020-04-27
    19                        Bihar       Bhagalpur    5  ...  0  None  2020-04-27
    
    [20 rows x 8 columns]
    Stopping thread id 2
                                  0               1    2  ...  5     6           7
    0   Andaman and Nicobar Islands         Unknown   33  ...  0  None  2020-04-26
    1                Andhra Pradesh       Anantapur   53  ...  0  None  2020-04-26
    2                Andhra Pradesh        Chittoor   73  ...  0  None  2020-04-26
    3                Andhra Pradesh   East Godavari   39  ...  0  None  2020-04-26
    4                Andhra Pradesh          Guntur  214  ...  0  None  2020-04-26
    5                Andhra Pradesh         Krishna  177  ...  0  None  2020-04-26
    6                Andhra Pradesh         Kurnool  279  ...  0  None  2020-04-26
    7                Andhra Pradesh        Prakasam   56  ...  0  None  2020-04-26
    8                Andhra Pradesh  S.P.S. Nellore   72  ...  0  None  2020-04-26
    9                Andhra Pradesh      Srikakulam    3  ...  0  None  2020-04-26
    10               Andhra Pradesh   Visakhapatnam   22  ...  0  None  2020-04-26
    11               Andhra Pradesh   West Godavari   51  ...  0  None  2020-04-26
    12               Andhra Pradesh   Y.S.R. Kadapa   58  ...  0  None  2020-04-26
    13            Arunachal Pradesh           Lohit    1  ...  0  None  2020-04-26
    14                        Assam         Unknown   36  ...  0  None  2020-04-26
    15                        Bihar           Arwal    4  ...  0  None  2020-04-26
    16                        Bihar      Aurangabad    2  ...  0  None  2020-04-26
    17                        Bihar           Banka    2  ...  0  None  2020-04-26
    18                        Bihar       Begusarai    9  ...  0  None  2020-04-26
    19                        Bihar       Bhagalpur    5  ...  0  None  2020-04-26
    
    [20 rows x 8 columns]
    Stopping thread id 1
                                  0               1    2  ...  5     6           7
    0   Andaman and Nicobar Islands         Unknown   33  ...  0  None  2020-04-28
    1                Andhra Pradesh       Anantapur   54  ...  0  None  2020-04-28
    2                Andhra Pradesh        Chittoor   74  ...  0  None  2020-04-28
    3                Andhra Pradesh   East Godavari   39  ...  0  None  2020-04-28
    4                Andhra Pradesh          Guntur  254  ...  0  None  2020-04-28
    5                Andhra Pradesh         Krishna  223  ...  0  None  2020-04-28
    6                Andhra Pradesh         Kurnool  332  ...  0  None  2020-04-28
    7                Andhra Pradesh        Prakasam   56  ...  0  None  2020-04-28
    8                Andhra Pradesh  S.P.S. Nellore   82  ...  0  None  2020-04-28
    9                Andhra Pradesh      Srikakulam    4  ...  0  None  2020-04-28
    10               Andhra Pradesh   Visakhapatnam   22  ...  0  None  2020-04-28
    11               Andhra Pradesh   West Godavari   54  ...  0  None  2020-04-28
    12               Andhra Pradesh   Y.S.R. Kadapa   65  ...  0  None  2020-04-28
    13            Arunachal Pradesh           Lohit    1  ...  0  None  2020-04-28
    14                        Assam         Unknown   38  ...  0  None  2020-04-28
    15                        Bihar          Araria    1  ...  0  None  2020-04-28
    16                        Bihar           Arwal    4  ...  0  None  2020-04-28
    17                        Bihar      Aurangabad    7  ...  0  None  2020-04-28
    18                        Bihar           Banka    3  ...  0  None  2020-04-28
    19                        Bihar       Begusarai    9  ...  0  None  2020-04-28
    
    [20 rows x 8 columns]
    Stopping thread id 3
                                  0               1    2  ...  5     6           7
    0   Andaman and Nicobar Islands         Unknown   33  ...  0  None  2020-04-30
    1                Andhra Pradesh       Anantapur   61  ...  0  None  2020-04-30
    2                Andhra Pradesh        Chittoor   80  ...  0  None  2020-04-30
    3                Andhra Pradesh   East Godavari   42  ...  0  None  2020-04-30
    4                Andhra Pradesh          Guntur  287  ...  0  None  2020-04-30
    5                Andhra Pradesh         Krishna  246  ...  0  None  2020-04-30
    6                Andhra Pradesh         Kurnool  386  ...  0  None  2020-04-30
    7                Andhra Pradesh        Prakasam   60  ...  0  None  2020-04-30
    8                Andhra Pradesh  S.P.S. Nellore   84  ...  0  None  2020-04-30
    9                Andhra Pradesh      Srikakulam    5  ...  0  None  2020-04-30
    10               Andhra Pradesh   Visakhapatnam   23  ...  0  None  2020-04-30
    11               Andhra Pradesh   West Godavari   56  ...  0  None  2020-04-30
    12               Andhra Pradesh   Y.S.R. Kadapa   73  ...  0  None  2020-04-30
    13            Arunachal Pradesh           Lohit    1  ...  0  None  2020-04-30
    14                        Assam         Unknown   43  ...  0  None  2020-04-30
    15                        Bihar          Araria    1  ...  0  None  2020-04-30
    16                        Bihar           Arwal    4  ...  0  None  2020-04-30
    17                        Bihar      Aurangabad    8  ...  0  None  2020-04-30
    18                        Bihar           Banka    3  ...  0  None  2020-04-30
    19                        Bihar       Begusarai   11  ...  0  None  2020-04-30
    
    [20 rows x 8 columns]
    Stopping thread id 5
                                  0               1    2  ...  5     6           7
    0   Andaman and Nicobar Islands         Unknown   33  ...  0  None  2020-04-29
    1                Andhra Pradesh       Anantapur   58  ...  0  None  2020-04-29
    2                Andhra Pradesh        Chittoor   77  ...  0  None  2020-04-29
    3                Andhra Pradesh   East Godavari   40  ...  0  None  2020-04-29
    4                Andhra Pradesh          Guntur  283  ...  0  None  2020-04-29
    5                Andhra Pradesh         Krishna  236  ...  0  None  2020-04-29
    6                Andhra Pradesh         Kurnool  343  ...  0  None  2020-04-29
    7                Andhra Pradesh        Prakasam   60  ...  0  None  2020-04-29
    8                Andhra Pradesh  S.P.S. Nellore   82  ...  0  None  2020-04-29
    9                Andhra Pradesh      Srikakulam    5  ...  0  None  2020-04-29
    10               Andhra Pradesh   Visakhapatnam   23  ...  0  None  2020-04-29
    11               Andhra Pradesh   West Godavari   56  ...  0  None  2020-04-29
    12               Andhra Pradesh   Y.S.R. Kadapa   69  ...  0  None  2020-04-29
    13            Arunachal Pradesh           Lohit    1  ...  0  None  2020-04-29
    14                        Assam         Unknown   38  ...  0  None  2020-04-29
    15                        Bihar          Araria    1  ...  0  None  2020-04-29
    16                        Bihar           Arwal    4  ...  0  None  2020-04-29
    17                        Bihar      Aurangabad    8  ...  0  None  2020-04-29
    18                        Bihar           Banka    3  ...  0  None  2020-04-29
    19                        Bihar       Begusarai   11  ...  0  None  2020-04-29
    
    [20 rows x 8 columns]
    Stopping thread id 4
    
    Process finished with exit code 0

    Note that in this blog, we have used a password as a hardcoded string, which is definitely not the way to define passwords. We should use secrets, .env files, etc., as input for passwords. We do not hardcode passwords in the production environment.

    Conclusion 

    After going through the above blog, you might have gotten more familiar with how to perform read and write operations on databases using spark, python, and multithreading concepts. You also know now what are multi processes and what multithreading is. You are now also able to analyze the best way to carry out read-and-write operations on a database based on your requirements. 

    In general, if you have a small amount of data, you can use a simple python approach to read and write data. If you have a relatively high amount of data, then you can use a multi-threaded approach or a single-partition Spark approach. If you have a huge amount of data, and where reading millions of records per second is a requirement, then you can use the Spark multi-partition approach. In the end, it’s just mostly personal preference, and using which approach depends on your requirements and availability of resources.

  • A Beginner’s Guide to Python Tornado

    The web is a big place now. We need to support thousands of clients at a time, and here comes Tornado. Tornado is a Python web framework and asynchronous network library, originally developed at FriendFreed.

    Tornado uses non-blocking network-io. Due to this, it can handle thousands of active server connections. It is a saviour for applications where long polling and a large number of active connections are maintained.

    Tornado is not like most Python frameworks. It’s not based on WSGI, while it supports some features of WSGI using module `tornado.wsgi`. It uses an event loop design that makes Tornado request execution faster.  

    What is Synchronous Program?

    A function blocks, performs its computation, and returns, once done . A function may block for many reasons: network I/O, disk I/O, mutexes, etc.

    Application performance depends on how efficiently application uses CPU cycles, that’s why blocking statements/calls must be taken seriously. Consider password hashing functions like bcrypt, which by design use hundreds of milliseconds of CPU time, far more than a typical network or disk access. As the CPU is not idle, there is no need to go for asynchronous functions.

    A function can be blocking in one, and non-blocking in others. In the context of Tornado, we generally consider blocking due to network I/O and disk, although all kinds of blocking need to be minimized.

    What is Asynchronous Program?

    1) Single-threaded architecture:

        Means, it can’t do computation-centric tasks parallely.

    2) I/O concurrency:

        It can handover IO tasks to the operating system and continue to the next task to achieve parallelism.

    3) epoll/ kqueue:

        Underline system-related construct that allows an application to get events on a file descriptor or I/O specific tasks.

    4) Event loop:

        It uses epoll or kqueue to check if any event has happened, and executes callback that is waiting for those network events.

    Asynchronous vs Synchronous Web Framework:

    In case of synchronous model, each request or task is transferred to thread or routing, and as it finishes, the result is handed over to the caller. Here, managing things are easy, but creating new threads is too much overhead.

    On the other hand, in Asynchronous framework, like Node.js, there is a single-threaded model, so very less overhead, but it has complexity.

    Let’s imagine thousands of requests coming through and a server uses event loop and callback. Now, until request gets processed, it has to efficiently store and manage the state of that request to map callback result to the actual client.

    Node.js vs Tornado

    Most of these comparison points are tied to actual programming language and not the framework: 

    • Node.js has one big advantage that all of its libraries are Async. In Python, there are lots of available packages, but very few of them are asynchronous
    • As Node.js is JavaScript runtime, and we can use JS for both front and back-end, developers can keep only one codebase and share the same utility library
    • Google’s V8 engine makes Node.js faster than Tornado. But a lot of Python libraries are written in C and can be faster alternatives.

    A Simple ‘Hello World’ Example

    import tornado.ioloop
    import tornado.web
    
    class MainHandler(tornado.web.RequestHandler):
        def get(self):
            self.write("Hello, world")
    
    def make_app():
        return tornado.web.Application([
            (r"/", MainHandler),
        ])
    
    if __name__ == "__main__":
        app = make_app()
        app.listen(8888)
        tornado.ioloop.IOLoop.current().start()

    Note: This example does not use any asynchronous feature.

    Using AsyncHTTPClient module, we can do REST call asynchronously.

    from tornado.httpclient import AsyncHTTPClient
    from tornado import gen
    
    @gen.coroutine
    def async_fetch_gen(url):
        http_client = AsyncHTTPClient()
        response = yield http_client.fetch(url)
        raise gen.Return(response.body)

    As you can see `yield http_client.fetch(url)` will run as a coroutine.

    Complex Example of Tornado Async

    Please have a look at Asynchronous Request handler.

    WebSockets Using Tornado:

    Tornado has built-in package for WebSockets that can be easily used with coroutines to achieve concurrency, here is one example:

    import logging
    import tornado.escape
    import tornado.ioloop
    import tornado.options
    import tornado.web
    import tornado.websocket
    from tornado.options import define, options
    from tornado.httpserver import HTTPServer
    
    define("port", default=8888, help="run on the given port", type=int)
    
    
    # queue_size = 1
    # producer_num_items = 5
    # q = queues.Queue(queue_size)
    
    def isPrime(num):
        """
        Simple worker but mostly IO/network call
        """
        if num > 1:
            for i in range(2, num // 2):
                if (num % i) == 0:
                    return ("is not a prime number")
            else:
                return("is a prime number")
        else:
            return ("is not a prime number")
    
    class Application(tornado.web.Application):
        def __init__(self):
            handlers = [(r"/chatsocket", TornadoWebSocket)]
            super(Application, self).__init__(handlers)
    
    class TornadoWebSocket(tornado.websocket.WebSocketHandler):
        clients = set()
    
        # enable cross domain origin
        def check_origin(self, origin):
            return True
    
        def open(self):
            TornadoWebSocket.clients.add(self)
    
        # when client closes connection
        def on_close(self):
            TornadoWebSocket.clients.remove(self)
    
        @classmethod
        def send_updates(cls, producer, result):
    
            for client in cls.clients:
    
                # check if result is mapped to correct sender
                if client == producer:
                    try:
                        client.write_message(result)
                    except:
                        logging.error("Error sending message", exc_info=True)
    
        def on_message(self, message):
            try:
                num = int(message)
            except ValueError:
                TornadoWebSocket.send_updates(self, "Invalid input")
                return
            TornadoWebSocket.send_updates(self, isPrime(num))
    
    def start_websockets():
        tornado.options.parse_command_line()
        app = Application()
        server = HTTPServer(app)
        server.listen(options.port)
        tornado.ioloop.IOLoop.current().start()
    
    
    
    if __name__ == "__main__":
        start_websockets()

    One can use a WebSocket client application to connect to the server, message can be any integer. After processing, the client receives the result if the integer is prime or not.  
    Here is one more example of actual async features of Tornado. Many will find it similar to Golang’s Goroutine and channels.

    In this example, we can start worker(s) and they will listen to the ‘tornado.queue‘. This queue is asynchronous and very similar to the asyncio package.

    # Example 1
    from tornado import gen, queues
    from tornado.ioloop import IOLoop
    
    @gen.coroutine
    def consumer(queue, num_expected):
        for _ in range(num_expected):
            # heavy I/O or network task
            print('got: %s' % (yield queue.get()))
    
    
    @gen.coroutine
    def producer(queue, num_items):
        for i in range(num_items):
            print('putting %s' % i)
            yield queue.put(i)
    
    @gen.coroutine
    def main():
        """
        Starts producer and consumer and wait till they finish
        """
        yield [producer(q, producer_num_items), consumer(q, producer_num_items)]
    
    queue_size = 1
    producer_num_items = 5
    q = queues.Queue(queue_size)
    
    results = IOLoop.current().run_sync(main)
    
    
    # Output:
    # putting 0
    # putting 1
    # got: 0
    # got: 1
    # putting 2
    # putting 3
    # putting 4
    # got: 2
    # got: 3
    # got: 4
    
    
    # Example 2
    # Condition
    # A condition allows one or more coroutines to wait until notified.
    from tornado import gen
    from tornado.ioloop import IOLoop
    from tornado.locks import Condition
    
    my_condition = Condition()
    
    @gen.coroutine
    def waiter():
        print("I'll wait right here")
        yield my_condition.wait()
        print("Received notification now doing my things")
    
    @gen.coroutine
    def notifier():
        yield gen.sleep(60)
        print("About to notify")
        my_condition.notify()
        print("Done notifying")
    
    @gen.coroutine
    def runner():
        # Wait for waiter() and notifier() in parallel
        yield([waiter(), notifier()])
    
    results = IOLoop.current().run_sync(runner)
    
    
    # output:
    
    # I'll wait right here
    # About to notify
    # Done notifying
    # Received notification now doing my things

    Conclusion

    1) Asynchronous frameworks are not much of use when most of the computations are CPU centric and not I/O.

    2) Due to a single thread per core model and event loop, it can manage thousands of active client connections.

    3) Many say Django is too big, Flask is too small, and Tornado is just right:)

  • An Introduction to Asynchronous Programming in Python

    Introduction

    Asynchronous programming is a type of parallel programming in which a unit of work is allowed to run separately from the primary application thread. When the work is complete, it notifies the main thread about completion or failure of the worker thread. There are numerous benefits to using it, such as improved application performance and enhanced responsiveness.

    Asynchronous programming has been gaining a lot of attention in the past few years, and for good reason. Although it can be more difficult than the traditional linear style, it is also much more efficient.

    For example, instead of waiting for an HTTP request to finish before continuing execution, with Python async coroutines you can submit the request and do other work that’s waiting in a queue while waiting for the HTTP request to finish.

    Asynchronicity seems to be a big reason why Node.js so popular for server-side programming. Much of the code we write, especially in heavy IO applications like websites, depends on external resources. This could be anything from a remote database call to POSTing to a REST service. As soon as you ask for any of these resources, your code is waiting around with nothing to do. With asynchronous programming, you allow your code to handle other tasks while waiting for these other resources to respond.

    How Does Python Do Multiple Things At Once?

    1. Multiple Processes

    The most obvious way is to use multiple processes. From the terminal, you can start your script two, three, four…ten times and then all the scripts are going to run independently or at the same time. The operating system that’s underneath will take care of sharing your CPU resources among all those instances. Alternately you can use the multiprocessing library which supports spawning processes as shown in the example below.

    from multiprocessing import Process
    
    
    def print_func(continent='Asia'):
        print('The name of continent is : ', continent)
    
    if __name__ == "__main__":  # confirms that the code is under main function
        names = ['America', 'Europe', 'Africa']
        procs = []
        proc = Process(target=print_func)  # instantiating without any argument
        procs.append(proc)
        proc.start()
    
        # instantiating process with arguments
        for name in names:
            # print(name)
            proc = Process(target=print_func, args=(name,))
            procs.append(proc)
            proc.start()
    
        # complete the processes
        for proc in procs:
            proc.join()

    Output:

    The name of continent is :  Asia
    The name of continent is :  America
    The name of continent is :  Europe
    The name of continent is :  Africa

    2. Multiple Threads

    The next way to run multiple things at once is to use threads. A thread is a line of execution, pretty much like a process, but you can have multiple threads in the context of one process and they all share access to common resources. But because of this, it’s difficult to write a threading code. And again, the operating system is doing all the heavy lifting on sharing the CPU, but the global interpreter lock (GIL) allows only one thread to run Python code at a given time even when you have multiple threads running code. So, In CPython, the GIL prevents multi-core concurrency. Basically, you’re running in a single core even though you may have two or four or more.

    import threading
     
    def print_cube(num):
        """
        function to print cube of given num
        """
        print("Cube: {}".format(num * num * num))
     
    def print_square(num):
        """
        function to print square of given num
        """
        print("Square: {}".format(num * num))
     
    if __name__ == "__main__":
        # creating thread
        t1 = threading.Thread(target=print_square, args=(10,))
        t2 = threading.Thread(target=print_cube, args=(10,))
     
        # starting thread 1
        t1.start()
        # starting thread 2
        t2.start()
     
        # wait until thread 1 is completely executed
        t1.join()
        # wait until thread 2 is completely executed
        t2.join()
     
        # both threads completely executed
        print("Done!")

    Output:

    Square: 100
    Cube: 1000
    Done!

    3. Coroutines using yield:

    Coroutines are generalization of subroutines. They are used for cooperative multitasking where a process voluntarily yield (give away) control periodically or when idle in order to enable multiple applications to be run simultaneously. Coroutines are similar to generators but with few extra methods and slight change in how we use yield statement. Generators produce data for iteration while coroutines can also consume data.

    def print_name(prefix):
        print("Searching prefix:{}".format(prefix))
        try : 
            while True:
                    # yeild used to create coroutine
                    name = (yield)
                    if prefix in name:
                        print(name)
        except GeneratorExit:
                print("Closing coroutine!!")
     
    corou = print_name("Dear")
    corou.__next__()
    corou.send("James")
    corou.send("Dear James")
    corou.close()

    Output:

    Searching prefix:Dear
    Dear James
    Closing coroutine!!

    4. Asynchronous Programming

    The fourth way is an asynchronous programming, where the OS is not participating. As far as OS is concerned you’re going to have one process and there’s going to be a single thread within that process, but you’ll be able to do multiple things at once. So, what’s the trick?

    The answer is asyncio

    asyncio is the new concurrency module introduced in Python 3.4. It is designed to use coroutines and futures to simplify asynchronous code and make it almost as readable as synchronous code as there are no callbacks.

    asyncio uses different constructs: event loopscoroutines and futures.

    • An event loop manages and distributes the execution of different tasks. It registers them and handles distributing the flow of control between them.
    • Coroutines (covered above) are special functions that work similarly to Python generators, on await they release the flow of control back to the event loop. A coroutine needs to be scheduled to run on the event loop, once scheduled coroutines are wrapped in Tasks which is a type of Future.
    • Futures represent the result of a task that may or may not have been executed. This result may be an exception.

    Using Asyncio, you can structure your code so subtasks are defined as coroutines and allows you to schedule them as you please, including simultaneously. Coroutines contain yield points where we define possible points where a context switch can happen if other tasks are pending, but will not if no other task is pending.

    A context switch in asyncio represents the event loop yielding the flow of control from one coroutine to the next.

    In the example, we run 3 async tasks that query Reddit separately, extract and print the JSON. We leverage aiohttp which is a http client library ensuring even the HTTP request runs asynchronously.

    import signal  
    import sys  
    import asyncio  
    import aiohttp  
    import json
    
    loop = asyncio.get_event_loop()  
    client = aiohttp.ClientSession(loop=loop)
    
    async def get_json(client, url):  
        async with client.get(url) as response:
            assert response.status == 200
            return await response.read()
    
    async def get_reddit_top(subreddit, client):  
        data1 = await get_json(client, 'https://www.reddit.com/r/' + subreddit + '/top.json?sort=top&t=day&limit=5')
    
        j = json.loads(data1.decode('utf-8'))
        for i in j['data']['children']:
            score = i['data']['score']
            title = i['data']['title']
            link = i['data']['url']
            print(str(score) + ': ' + title + ' (' + link + ')')
    
        print('DONE:', subreddit + '\n')
    
    def signal_handler(signal, frame):  
        loop.stop()
        client.close()
        sys.exit(0)
    
    signal.signal(signal.SIGINT, signal_handler)
    
    asyncio.ensure_future(get_reddit_top('python', client))  
    asyncio.ensure_future(get_reddit_top('programming', client))  
    asyncio.ensure_future(get_reddit_top('compsci', client))  
    loop.run_forever()

    Output:

    50: Undershoot: Parsing theory in 1965 (http://jeffreykegler.github.io/Ocean-of-Awareness-blog/individual/2018/07/knuth_1965_2.html)
    12: Question about best-prefix/failure function/primal match table in kmp algorithm (https://www.reddit.com/r/compsci/comments/8xd3m2/question_about_bestprefixfailure_functionprimal/)
    1: Question regarding calculating the probability of failure of a RAID system (https://www.reddit.com/r/compsci/comments/8xbkk2/question_regarding_calculating_the_probability_of/)
    DONE: compsci
    
    336: /r/thanosdidnothingwrong -- banning people with python (https://clips.twitch.tv/AstutePluckyCocoaLitty)
    175: PythonRobotics: Python sample codes for robotics algorithms (https://atsushisakai.github.io/PythonRobotics/)
    23: Python and Flask Tutorial in VS Code (https://code.visualstudio.com/docs/python/tutorial-flask)
    17: Started a new blog on Celery - what would you like to read about? (https://www.python-celery.com)
    14: A Simple Anomaly Detection Algorithm in Python (https://medium.com/@mathmare_/pyng-a-simple-anomaly-detection-algorithm-2f355d7dc054)
    DONE: python
    
    1360: git bundle (https://dev.to/gabeguz/git-bundle-2l5o)
    1191: Which hashing algorithm is best for uniqueness and speed? Ian Boyd's answer (top voted) is one of the best comments I've seen on Stackexchange. (https://softwareengineering.stackexchange.com/questions/49550/which-hashing-algorithm-is-best-for-uniqueness-and-speed)
    430: ARM launchesFactscampaign against RISC-V (https://riscv-basics.com/)
    244: Choice of search engine on Android nuked byAnonymous Coward” (2009) (https://android.googlesource.com/platform/packages/apps/GlobalSearch/+/592150ac00086400415afe936d96f04d3be3ba0c)
    209: Exploiting freely accessible WhatsApp data orWhy does WhatsApp web know my phones battery level?” (https://medium.com/@juan_cortes/exploiting-freely-accessible-whatsapp-data-or-why-does-whatsapp-know-my-battery-level-ddac224041b4)
    DONE: programming

    Using Redis and Redis Queue(RQ):

    Using asyncio and aiohttp may not always be in an option especially if you are using older versions of python. Also, there will be scenarios when you would want to distribute your tasks across different servers. In that case we can leverage RQ (Redis Queue). It is a simple Python library for queueing jobs and processing them in the background with workers. It is backed by Redis – a key/value data store.

    In the example below, we have queued a simple function count_words_at_url using redis.

    from mymodule import count_words_at_url
    from redis import Redis
    from rq import Queue
    
    
    q = Queue(connection=Redis())
    job = q.enqueue(count_words_at_url, 'http://nvie.com')
    
    
    ******mymodule.py******
    
    import requests
    
    def count_words_at_url(url):
        """Just an example function that's called async."""
        resp = requests.get(url)
    
        print( len(resp.text.split()))
        return( len(resp.text.split()))

    Output:

    15:10:45 RQ worker 'rq:worker:EMPID18030.9865' started, version 0.11.0
    15:10:45 *** Listening on default...
    15:10:45 Cleaning registries for queue: default
    15:10:50 default: mymodule.count_words_at_url('http://nvie.com') (a2b7451e-731f-4f31-9232-2b7e3549051f)
    322
    15:10:51 default: Job OK (a2b7451e-731f-4f31-9232-2b7e3549051f)
    15:10:51 Result is kept for 500 seconds

    Conclusion:

    Let’s take a classical example chess exhibition where one of the best chess players competes against a lot of people. And if there are 24 games with 24 people to play with and the chess master plays with all of them synchronically, it’ll take at least 12 hours (taking into account that the average game takes 30 moves, the chess master thinks for 5 seconds to come up with a move and the opponent – for approximately 55 seconds). But using the asynchronous mode gives chess master the opportunity to make a move and leave the opponent thinking while going to the next one and making a move there. This way a move on all 24 games can be done in 2 minutes and all of them can be won in just one hour.

    So, this is what’s meant when people talk about asynchronous being really fast. It’s this kind of fast. Chess master doesn’t play chess faster, the time is just more optimized and it’s not get wasted on waiting around. This is how it works.

    In this analogy, the chess master will be our CPU and the idea is that we wanna make sure that the CPU doesn’t wait or waits the least amount of time possible. It’s about always finding something to do.

    A practical definition of Async is that it’s a style of concurrent programming in which tasks release the CPU during waiting periods, so that other tasks can use it. In Python, there are several ways to achieve concurrency, based on our requirement, code flow, data manipulation, architecture design  and use cases we can select any of these methods.

  • Implementing gRPC In Python: A Step-by-step Guide

    In the last few years, we saw a great shift in technology, where projects are moving towards “microservice architecture” vs the old “monolithic architecture”. This approach has done wonders for us. 

    As we say, “smaller things are much easier to handle”, so here we have microservices that can be handled conveniently. We need to interact among different microservices. I handled it using the HTTP API call, which seems great and it worked for me.

    But is this the perfect way to do things?

    The answer is a resounding, “no,” because we compromised both speed and efficiency here. 

    Then came in the picture, the gRPC framework, that has been a game-changer.

    What is gRPC?

    Quoting the official documentation

    gRPC or Google Remote Procedure Call is a modern open-source high-performance RPC framework that can run in any environment. It can efficiently connect services in and across data centers with pluggable support for load balancing, tracing, health checking and authentication.”

     

    Credit: gRPC

    RPC or remote procedure calls are the messages that the server sends to the remote system to get the task(or subroutines) done.

    Google’s RPC is designed to facilitate smooth and efficient communication between the services. It can be utilized in different ways, such as:

    • Efficiently connecting polyglot services in microservices style architecture
    • Connecting mobile devices, browser clients to backend services
    • Generating efficient client libraries

    Why gRPC? 

    HTTP/2 based transport – It uses HTTP/2 protocol instead of HTTP 1.1. HTTP/2 protocol provides multiple benefits over the latter. One major benefit is multiple bidirectional streams that can be created and sent over TCP connections parallelly, making it swift. 

    Auth, tracing, load balancing and health checking – gRPC provides all these features, making it a secure and reliable option to choose.

    Language independent communication– Two services may be written in different languages, say Python and Golang. gRPC ensures smooth communication between them.

    Use of Protocol Buffers – gRPC uses protocol buffers for defining the type of data (also called Interface Definition Language (IDL)) to be sent between the gRPC client and the gRPC server. It also uses it as the message interchange format. 

    Let’s dig a little more into what are Protocol Buffers.

    Protocol Buffers

    Protocol Buffers like XML, are an efficient and automated mechanism for serializing structured data. They provide a way to define the structure of data to be transmitted. Google says that protocol buffers are better than XML, as they are:

    • simpler
    • three to ten times smaller
    • 20 to 100 times faster
    • less ambiguous
    • generates data access classes that make it easier to use them programmatically

    Protobuf are defined in .proto files. It is easy to define them. 

    Types of gRPC implementation

    1. Unary RPCs:- This is a simple gRPC which works like a normal function call. It sends a single request declared in the .proto file to the server and gets back a single response from the server.

    rpc HelloServer(RequestMessage) returns (ResponseMessage);

    2. Server streaming RPCs:- The client sends a message declared in the .proto file to the server and gets back a stream of message sequence to read. The client reads from that stream of messages until there are no messages.

    rpc HelloServer(RequestMessage) returns (stream ResponseMessage);

    3. Client streaming RPCs:- The client writes a message sequence using a write stream and sends the same to the server. After all the messages are sent to the server, the client waits for the server to read all the messages and return a response.

    rpc HelloServer(stream RequestMessage) returns (ResponseMessage);

    4. Bidirectional streaming RPCs:- Both gRPC client and the gRPC server use a read-write stream to send a message sequence. Both operate independently, so gRPC clients and gRPC servers can write and read in any order they like, i.e. the server can read a message then write a message alternatively, wait to receive all messages then write its responses, or perform reads and writes in any other combination.

    rpc HelloServer(stream RequestMessage) returns (stream ResponseMessage);

    **gRPC guarantees the ordering of messages within an individual RPC call. In the case of Bidirectional streaming, the order of messages is preserved in each stream.

    Implementing gRPC in Python

    Currently, gRPC provides support for many languages like Golang, C++, Java, etc. I will be focussing on its implementation using Python.

    mkdir grpc_example
    cd grpc_example
    virtualenv -p python3 env
    source env/bin/activate
    pip install grpcio grpcio-tools

    This will install all the required dependencies to implement gRPC.

    Unary gRPC 

    For implementing gRPC services, we need to define three files:-

    • Proto file – Proto file comprises the declaration of the service that is used to generate stubs (<package_name>_pb2.py and <package_name>_pb2_grpc.py). These are used by the gRPC client and the gRPC server.</package_name></package_name>
    • gRPC client – The client makes a gRPC call to the server to get the response as per the proto file.
    • gRPC Server – The server is responsible for serving requests to the client.
    syntax = "proto3";
    
    package unary;
    
    service Unary{
      // A simple RPC.
      //
      // Obtains the MessageResponse at a given position.
     rpc GetServerResponse(Message) returns (MessageResponse) {}
    
    }
    
    message Message{
     string message = 1;
    }
    
    message MessageResponse{
     string message = 1;
     bool received = 2;
    }

    In the above code, we have declared a service named Unary. It consists of a collection of services. For now, I have implemented a single service GetServerResponse(). This service takes an input of type Message and returns a MessageResponse. Below the service declaration, I have declared Message and Message Response.

    Once we are done with the creation of the .proto file, we need to generate the stubs. For that, we will execute the below command:-

    python -m grpc_tools.protoc --proto_path=. ./unary.proto --python_out=. --grpc_python_out=.

    Two files are generated named unary_pb2.py and unary_pb2_grpc.py. Using these two stub files, we will implement the gRPC server and the client.

    Implementing the Server

    import grpc
    from concurrent import futures
    import time
    import unary.unary_pb2_grpc as pb2_grpc
    import unary.unary_pb2 as pb2
    
    
    class UnaryService(pb2_grpc.UnaryServicer):
    
        def __init__(self, *args, **kwargs):
            pass
    
        def GetServerResponse(self, request, context):
    
            # get the string from the incoming request
            message = request.message
            result = f'Hello I am up and running received "{message}" message from you'
            result = {'message': result, 'received': True}
    
            return pb2.MessageResponse(**result)
    
    
    def serve():
        server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
        pb2_grpc.add_UnaryServicer_to_server(UnaryService(), server)
        server.add_insecure_port('[::]:50051')
        server.start()
        server.wait_for_termination()
    
    
    if __name__ == '__main__':
        serve()

    In the gRPC server file, there is a GetServerResponse() method which takes `Message` from the client and returns a `MessageResponse` as defined in the proto file.

    server() function is called from the main function, and makes sure that the server is listening to all the time. We will run the unary_server to start the server

    python3 unary_server.py

    Implementing the Client

    import grpc
    import unary.unary_pb2_grpc as pb2_grpc
    import unary.unary_pb2 as pb2
    
    
    class UnaryClient(object):
        """
        Client for gRPC functionality
        """
    
        def __init__(self):
            self.host = 'localhost'
            self.server_port = 50051
    
            # instantiate a channel
            self.channel = grpc.insecure_channel(
                '{}:{}'.format(self.host, self.server_port))
    
            # bind the client and the server
            self.stub = pb2_grpc.UnaryStub(self.channel)
    
        def get_url(self, message):
            """
            Client function to call the rpc for GetServerResponse
            """
            message = pb2.Message(message=message)
            print(f'{message}')
            return self.stub.GetServerResponse(message)
    
    
    if __name__ == '__main__':
        client = UnaryClient()
        result = client.get_url(message="Hello Server you there?")
        print(f'{result}')

    In the __init__func. we have initialized the stub using ` self.stub = pb2_grpc.UnaryStub(self.channel)’ And we have a get_url function which calls to server using the above-initialized stub  

    This completes the implementation of Unary gRPC service.

    Let’s check the output:-

    Run -> python3 unary_client.py 

    Output:-

    message: “Hello Server you there?”

    message: “Hello I am up and running. Received ‘Hello Server you there?’ message from you”

    received: true

    Bidirectional Implementation

    syntax = "proto3";
    
    package bidirectional;
    
    service Bidirectional {
      // A Bidirectional streaming RPC.
      //
      // Accepts a stream of Message sent while a route is being traversed,
       rpc GetServerResponse(stream Message) returns (stream Message) {}
    }
    
    message Message {
      string message = 1;
    }

    In the above code, we have declared a service named Bidirectional. It consists of a collection of services. For now, I have implemented a single service GetServerResponse(). This service takes an input of type Message and returns a Message. Below the service declaration, I have declared Message.

    Once we are done with the creation of the .proto file, we need to generate the stubs. To generate the stub, we need the execute the below command:-

    python -m grpc_tools.protoc --proto_path=.  ./bidirecctional.proto --python_out=. --grpc_python_out=.

    Two files are generated named bidirectional_pb2.py and bidirectional_pb2_grpc.py. Using these two stub files, we will implement the gRPC server and client.

    Implementing the Server

    from concurrent import futures
    
    import grpc
    import bidirectional.bidirectional_pb2_grpc as bidirectional_pb2_grpc
    
    
    class BidirectionalService(bidirectional_pb2_grpc.BidirectionalServicer):
    
        def GetServerResponse(self, request_iterator, context):
            for message in request_iterator:
                yield message
    
    
    def serve():
        server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
        bidirectional_pb2_grpc.add_BidirectionalServicer_to_server(BidirectionalService(), server)
        server.add_insecure_port('[::]:50051')
        server.start()
        server.wait_for_termination()
    
    
    if __name__ == '__main__':
        serve()

    In the gRPC server file, there is a GetServerResponse() method which takes a stream of `Message` from the client and returns a stream of `Message` independent of each other. server() function is called from the main function and makes sure that the server is listening to all the time.

    We will run the bidirectional_server to start the server:

    python3 bidirectional_server.py

    Implementing the Client

    from __future__ import print_function
    
    import grpc
    import bidirectional.bidirectional_pb2_grpc as bidirectional_pb2_grpc
    import bidirectional.bidirectional_pb2 as bidirectional_pb2
    
    
    def make_message(message):
        return bidirectional_pb2.Message(
            message=message
        )
    
    
    def generate_messages():
        messages = [
            make_message("First message"),
            make_message("Second message"),
            make_message("Third message"),
            make_message("Fourth message"),
            make_message("Fifth message"),
        ]
        for msg in messages:
            print("Hello Server Sending you the %s" % msg.message)
            yield msg
    
    
    def send_message(stub):
        responses = stub.GetServerResponse(generate_messages())
        for response in responses:
            print("Hello from the server received your %s" % response.message)
    
    
    def run():
        with grpc.insecure_channel('localhost:50051') as channel:
            stub = bidirectional_pb2_grpc.BidirectionalStub(channel)
            send_message(stub)
    
    
    if __name__ == '__main__':
        run()

    In the run() function. we have initialised the stub using `  stub = bidirectional_pb2_grpc.BidirectionalStub(channel)’

    And we have a send_message function to which the stub is passed and it makes multiple calls to the server and receives the results from the server simultaneously.

    This completes the implementation of Bidirectional gRPC service.

    Let’s check the output:-

    Run -> python3 bidirectional_client.py 

    Output:-

    Hello Server Sending you the First message

    Hello Server Sending you the Second message

    Hello Server Sending you the Third message

    Hello Server Sending you the Fourth message

    Hello Server Sending you the Fifth message

    Hello from the server received your First message

    Hello from the server received your Second message

    Hello from the server received your Third message

    Hello from the server received your Fourth message

    Hello from the server received your Fifth message

    For code reference, please visit here.

    Conclusion‍

    gRPC is an emerging RPC framework that makes communication between microservices smooth and efficient. I believe gRPC is currently confined to inter microservice but has many other utilities that we will see in the coming years. To know more about modern data communication solutions, check out this blog.

  • The Ultimate Beginner’s Guide to Jupyter Notebooks

    Jupyter Notebooks offer a great way to write and iterate on your Python code. It is a powerful tool for developing data science projects in an interactive way. Jupyter Notebook allows to showcase the source code and its corresponding output at a single place helping combine narrative text, visualizations and other rich media.The intuitive workflow promotes iterative and rapid development, making notebooks the first choice for data scientists. Creating Jupyter Notebooks is completely free as it falls under Project Jupyter which is completely open source.

    Project Jupyter is the successor to an earlier project IPython Notebook, which was first published as a prototype in 2010. Jupyter Notebook is built on top of iPython, an interactive tool for executing Python code in the terminal using REPL model(Read-Eval-Print-Loop). The iPython kernel executes the python code and communicates with the Jupyter Notebook front-end interface. Jupyter Notebooks also provide additional features like storing your code and output and keep the markdown by extending iPython.

    Although Jupyter Notebooks support using various programming languages, we will focus on Python and its application in this article.

    Getting Started with Jupyter Notebooks!

    Installation

    Prerequisites

    As you would have surmised from the above abstract we need to have Python installed on your machine. Either Python 2.7 or Python 3.+ will do.

    Install Using Anaconda

    The simplest way to get started with Jupyter Notebooks is by installing it using Anaconda. Anaconda installs both Python3 and Jupyter and also includes quite a lot of packages commonly used in the data science and machine learning community. You can follow the latest guidelines from here.

    Install Using Pip

    If, for some reason, you decide not to use Anaconda, then you can install Jupyter manually using Python pip package, just follow the below code:

    pip install jupyter

    Launching First Notebook

    Open your terminal, navigate to the directory where you would like to store you notebook and launch the Jupyter Notebooks. Then type the below command and the program will instantiate a local server at http://localhost:8888/tree.

    jupyter notebook

    A new window with the Jupyter Notebook interface will open in your internet browser. As you might have already noticed Jupyter starts up a local Python server to serve web apps in your browser, where you can access the Dashboard and work with the Jupyter Notebooks. The Jupyter Notebooks are platform independent which makes it easier to collaborate and share with others.

    The list of all files is displayed under the Files tab whereas all the running processes can be viewed by clicking on the Running tab and the third tab, Clusters is extended from IPython parallel, IPython’s parallel computing framework. It helps you to control multiple engines, extended from the IPython kernel.

    Let’s start by making a new notebook. We can easily do this by clicking on the New drop-down list in the top- right corner of the dashboard. You see that you have the option to make a Python 3 notebook as well as regular text file, a folder, and a terminal. Please select the Python 3 notebook option.

    Your Jupyter Notebook will open in a new tab as shown in below image.

    Now each notebook is opened in a new tab so that you can simultaneously work with multiple notebooks. If you go back to the dashboard tab, you will see the new file Untitled.ipynb and you should see some green icon to it’s left which indicates your new notebook is running.

     

    Why a .ipynb file?

    .ipynb is the standard file format for storing Jupyter Notebooks, hence the file name Untitled.ipynb. Let’s begin by first understanding what an .ipynb file is and what it might contain. Each .ipynb file is a text file that describes the content of your notebook in a JSON format. The content of each cell, whether it is text, code or image attachments that have been converted into strings, along with some additional metadata is stored in the .ipynb file. You can also edit the metadata by selecting “Edit > Edit Notebook Metadata” from the menu options in the notebook.

    You can also view the content of your notebook files by selecting “Edit” from the controls on the dashboard, there’s no reason to do so unless you really want to edit the file manually.

    Understanding the Notebook Interface

    Now that you have created a notebook, let’s have a look at the various menu options and functions, which are readily available. Take some time out to scroll through the the list of commands that opens up when you click on the keyboard icon (or press Ctrl + Shift + P).

    There are two prominent terminologies that you should care to learn about: cells and kernels are key both to understanding Jupyter and to what makes it more than just a content writing tool. Fortunately, these concepts are not difficult to understand.

    • A kernel is a program that interprets and executes the user’s code. The Jupyter Notebook App has an inbuilt kernel for Python code, but there are also kernels available for other programming languages.
    • A cell is a container which holds the executable code or normal text 

    Cells

    Cells form the body of a notebook. If you look at the screenshot above for a new notebook (Untitled.ipynb), the text box with the green border is an empty cell. There are 4 types of cells:

    • Code – This is where you type your code and when executed the kernel will display its output below the cell.
    • Markdown – This is where you type your text formatted using Markdown and the output is displayed in place when it is run.
    • Raw NBConvert – It’s a command line tool to convert your notebook into another format (like HTML, PDF etc.)
    • Heading – This is where you add Headings to separate sections and make your notebook look tidy and neat. This has now been merged into the Markdown option itself. Adding a ‘#’ at the beginning ensures that whatever you type after that will be taken as a heading.

    Let’s test out how the cells work with a basic “hello world” example. Type print(‘Hello World!’) in the cell and press Ctrl + Enter or click on the Run button in the toolbar at the top.

    print("Hello World!")

    Hello World!

    When you run the cell, the output will be displayed below, and the label to its left changes from In[ ] to In[1] . Moreover, to signify that the cell is still running, Jupyter changes the label to In[*]

    Additionally, it is important to note that the output of a code cell comes from any of the print statements in the code cell, as well as the value of the last line in the cell, irrespective of it being a variable, function call or some other code snippet.

    Markdown

    Markdown is a lightweight, markup language for formatting plain text. Its syntax has a one-to-one correspondence with HTML tags. As this article has been written in a Jupyter notebook, all of the narrative text and images you can see, are written in Markdown. Let’s go through the basics with the following example.

    # This is a level 1 heading 
    ### This is a level 3 heading
    This is how you write some plain text that would form a paragraph.
    You can emphasize the text by enclosing the text like "**" or "__" to make it bold and enclosing the text in "*" or "_" to make it italic. 
    Paragraphs are separated by an empty line.
    * We can include lists.
      * And also indent them.
    
    1. Getting Numbered lists is
    2. Also easy.
    
    [To include hyperlinks enclose the text with square braces and then add the link url in round braces](http://www.example.com)
    
    Inline code uses single backticks: `foo()`, and code blocks use triple backticks:
    
    ``` 
    foo()
    ```
    
    Or can be indented by 4 spaces: 
    
        foo()
        
    And finally, adding images is easy: ![Online Image](https://www.example.com/image.jpg) or ![Local Image](img/image.jpg) or ![Image Attachment](attachment:image.jpg)

    We have 3 different ways to attach images

    • Link the URL of an image from the web.
    • Use relative path of an image present locally
    • Add an attachment to the notebook by using “Edit>Insert Image” option; This method converts the image into a string and store it inside your notebook

    Note that adding an image as an attachment will make the .ipynb file much larger because it is stored inside the notebook in a string format.

    There are a lot more features available in Markdown. To learn more about markdown, you can refer to the official guide from the creator, John Gruber, on his website.

    Kernels

    Every notebook runs on top of a kernel. Whenever you execute a code cell, the content of the cell is executed within the kernel and any output is returned back to the cell for display. The kernel’s state applies to the document as a whole and not individual cells and is persisted over time.

    For example, if you declare a variable or import some libraries in a cell, they will be accessible in other cells. Now let’s understand this with the help of an example. First we’ll import a Python package and then define a function.

    import os, binascii
    def sum(x,y):
      return x+y

    Once the cell above  is executed, we can reference os, binascii and sum in any other cell.

    rand_hex_string = binascii.b2a_hex(os.urandom(15)) 
    print(rand_hex_string)
    x = 1
    y = 2
    z = sum(x,y)
    print('Sum of %d and %d is %d' % (x, y, z))

    The output should look something like this:

    c84766ca4a3ce52c3602bbf02a
    d1f7 Sum of 1 and 2 is 3

    The execution flow of a notebook is generally from top-to-bottom, but it’s common to go back to make changes. The order of execution is shown to the left of each cell, such as In [2] , will let you know whether any of your cells have stale output. Additionally, there are multiple options in the Kernel menu which often come very handy.

    • Restart: restarts the kernel, thus clearing all the variables etc that were defined.
    • Restart & Clear Output: same as above but will also wipe the output displayed below your code cells.
    • Restart & Run All: same as above but will also run all your cells in order from top-to-bottom.
    • Interrupt: If your kernel is ever stuck on a computation and you wish to stop it, you can choose the Interrupt option.

    Naming Your Notebooks

    It is always a best practice to give a meaningful name to your notebooks. You can rename your notebooks from the notebook app itself by double-clicking on the existing name at the top left corner. You can also use the dashboard or the file browser to rename the notebook file. We’ll head back to the dashboard to rename the file we created earlier, which will have the default notebook file name Untitled.ipynb.

    Now that you are back on the dashboard, you can simply select your notebook and click “Rename” in the dashboard controls

    Jupyter notebook - Rename

    Shutting Down your Notebooks

    We can shutdown a running notebook by selecting “File > Close and Halt” from the notebook menu. However, we can also shutdown the kernel either by selecting the notebook in the dashboard and clicking “Shutdown” or by going to “Kernel > Shutdown” from within the notebook app (see images below).

    Shutdown the kernel from Notebook App:

     

    Shutdown the kernel from Dashboard:

     

     

    Sharing Your Notebooks

    When we talk about sharing a notebook, there are two things that might come to our mind. In most cases, we would want to share the end-result of the work, i.e. sharing non-interactive, pre-rendered version of the notebook, very much similar to this article; however, in some cases we might want to share the code and collaborate with others on notebooks with the aid of version control systems such as Git which is also possible.

    Before You Start Sharing

    The state of the shared notebook including the output of any code cells is maintained when exported to a file. Hence, to ensure that the notebook is share-ready, we should follow below steps before sharing.

    1. Click “Cell > All Output > Clear”
    2. Click “Kernel > Restart & Run All”
    3. After the code cells have finished executing, validate the output. 

    This ensures that your notebooks don’t have a stale state or contain intermediary output.

    Exporting Your Notebooks

    Jupyter has built-in support for exporting to HTML, Markdown and PDF as well as several other formats, which you can find from the menu under “File > Download as” . It is a very convenient way to share the results with others. But if sharing exported files isn’t suitable for you, there are some other popular methods of sharing the notebooks directly on the web.

    • GitHub
    • With home to over 2 million notebooks, GitHub is the most popular place for sharing Jupyter projects with the world. GitHub has integrated support for rendering .ipynb files directly both in repositories and gists on its website.
    • You can just follow the GitHub guides for you to get started on your own.
    • Nbviewer
    • NBViewer is one of the most prominent notebook renderers on the web.
    • It also renders your notebook from GitHub and other such code storage platforms and provide a shareable URL along with it. nbviewer.jupyter.org provides a free rendering service as part of Project Jupyter.

    Data Analysis in a Jupyter Notebook

    Now that we’ve looked at what a Jupyter Notebook is, it’s time to look at how they’re used in practice, which should give you a clearer understanding of why they are so popular. As we walk through the sample analysis, you will be able to see how the flow of a notebook makes the task intuitive to work through ourselves, as well as for others to understand when we share it with them. We also hope to learn some of the more advanced features of Jupyter notebooks along the way. So let’s get started, shall we?

    Analyzing the Revenue and Profit Trends of Fortune 500 US companies from 1955-2013

    So, let’s say you’ve been tasked with finding out how the revenues and profits of the largest companies in the US changed historically over the past 60 years. We shall begin by gathering the data to analyze.

    Gathering the DataSet

    The data set that we will be using to analyze the revenue and profit trends of fortune 500 companies has been sourced from Fortune 500 Archives and Top Foreign Stocks. For your ease we have compiled the data from both the sources and created a CSV for you.

    Importing the Required Dependencies

    Let’s start off with a code cell specifically for imports and initial setup, so that if we need to add or change anything at a later point in time, we can simply edit and re-run the cell without having to change the other cells. We can start by importing Pandas to work with our data, Matplotlib to plot the charts and Seaborn to make our charts prettier.

    import pandas as pd
    import matplotlib.pyplot as plt
    import seaborn as sns
    import sys

    Set the design styles for the charts

    sns.set(style="darkgrid")

    Load the Input Data to be Analyzed

    As we plan on using pandas to aid in our analysis, let’s begin by importing our input data set into the most widely used pandas data-structure, DataFrame.

    df = pd.read_csv('../data/fortune500_1955_2013.csv')

    Now that we are done loading our input dataset, let us see how it looks like!

    df.head()

    Looking good. Each row corresponds to a single company per year and all the columns we need are present.

    Exploring the Dataset

    Next, let’s begin by exploring our data set. We will primarily look into the number of records imported and the data types for each of the different columns that were imported.

    As we have 500 data points per year and since the data set has records between 1955 and 2012, the total number of records in the dataset looks good!

    Now, let’s move on to the individual data types for each of the column.

    df.columns = ['year', 'rank', 'company', 'revenue', 'profit']
    len(df)

    df.dtypes

    As we can see from the output of the above command the data types for the columns revenue and profit are being shown as object whereas the expected data type should be float. It indicates that there may be some non-numeric values in the revenue and profit columns.

    So let’s first look at the details of imported values for revenue.

    non_numeric_revenues = df.revenue.str.contains('[^0-9.-]')
    df.loc[non_numeric_revenues].head()

    print("Number of Non-numeric revenue values: ", len(df.loc[non_numeric_revenues]))

    Number of Non-numeric revenue values:	1

    print("List of distinct Non-numeric revenue values: ", set(df.revenue[non_numeric_revenues]))

    List of distinct Non-numeric revenue values:	{'N.A.'}

    As the number of non-numeric revenue values is considerably less compared to the total size of our data set. Hence, it would be easier to just remove those rows.

    df = df.loc[~non_numeric_revenues]
    df.revenue = df.revenue.apply(pd.to_numeric)
    eval(In[6])

    Now that the data type issue for column revenue is resolved, let’s move on to values in column profit.

    non_numeric_profits = df.profit.str.contains('[^0-9.-]')
    df.loc[non_numeric_profits].head()

    print("Number of Non-numeric profit values: ", len(df.loc[non_numeric_profits]))

    Number of Non-numeric profit values:	374

    print("List of distinct Non-numeric profit values: ", set(df.profit[non_numeric_profits]))

    List of distinct Non-numeric profit values:	{'N.A.'}

    As the number of non-numeric profit values is around 1.5% which is a small percentage of our data set, but not completely inconsequential. Let’s take a quick look at the distribution of values and if the rows having N.A. values are uniformly distributed over the years then it would be wise to just remove the rows with missing values.

    bin_sizes, _, _ = plt.hist(df.year[non_numeric_profits], bins=range(1955, 2013))

    As observed from the histogram above, majority of invalid values in single year is fewer than 25, removing these values would account for less than 4% of the data as there are 500 data points per year. Also, other than a surge around 1990, most years have fewer than less than 10 values missing. Let’s assume that this is acceptable for us and move ahead with removing these rows.

    df = df.loc[~non_numeric_profits]
    df.profit = df.profit.apply(pd.to_numeric)

    We should validate if that worked!

    eval(In[6])

    Hurray! Our dataset has been cleaned up.

    Time to Plot the graphs

    Let’s begin with defining a function to plot the graph, set the title and add lables for the x-axis and y-axis.

    # function to plot the graphs for average revenues or profits of the fortune 500 companies against year
    def plot(x, y, ax, title, y_label):
        ax.set_title(title)
        ax.set_ylabel(y_label)
        ax.plot(x, y)
        ax.margins(x=0, y=0)
        
    # function to plot the graphs with superimposed standard deviation    
    def plot_with_std(x, y, stds, ax, title, y_label):
        ax.fill_between(x, y - stds, y + stds, alpha=0.2)
        plot(x, y, ax, title, y_label)

    Let’s plot the average profit by year and average revenue by year using Matplotlib.

    group_by_year = df.loc[:, ['year', 'revenue', 'profit']].groupby('year')
    avgs = group_by_year.mean()
    x = avgs.index
    y = avgs.profit
    
    fig, ax = plt.subplots()
    plot(x, y, ax, 'Increase in mean Fortune 500 company profits from 1955 to 2013', 'Profit (millions)')

    y2 = avgs.revenue
    fig, ax = plt.subplots()
    plot(x, y2, ax, 'Increase in mean Fortune 500 company revenues from 1955 to 2013', 'Revenue (millions)')

    Woah! The charts for profits has got some huge ups and downs. It seems like they correspond to the early 1990s recession, the dot-com bubble in the early 2000s and the Great Recession in 2008.

    On the other hand, the Revenues are constantly growing and are comparatively stable. Also it does help to understand how the average profits recovered so quickly after the staggering drops because of the recession.

    Let’s also take a look at how the average profits and revenues compare to their standard deviations.

    fig, (ax1, ax2) = plt.subplots(ncols=2)
    title = 'Increase in mean and std Fortune 500 company %s from 1955 to 2013'
    stds1 = group_by_year.std().profit.values
    stds2 = group_by_year.std().revenue.values
    plot_with_std(x, y.values, stds1, ax1, title % 'profits', 'Profit (millions)')
    plot_with_std(x, y2.values, stds2, ax2, title % 'revenues', 'Revenue (millions)')
    fig.set_size_inches(14, 4)
    fig.tight_layout()

     

    That’s astonishing, the standard deviations are huge. Some companies are making billions while some others are losing as much, and the risk certainly has increased along with rising profits and revenues over the years. Although we could keep on playing around with our data set and plot plenty more charts to analyze, it is time to bring this article to a close.

    Conclusion

    As part of this article we have seen various features of the Jupyter notebooks, from basics like installation, creating, and running code cells to more advanced features like plotting graphs. The power of Jupyter Notebooks to promote a productive working experience and provide an ease of use is evident from the above example, and I do hope that you feel confident to begin using Jupyter Notebooks in your own work and start exploring more advanced features. You can read more about data analytics using Pandas here.

    If you’d like to further explore and want to look at more examples, Jupyter has put together A Gallery of Interesting Jupyter Notebooks that you may find helpful and the Nbviewer homepage provides a lot of examples for further references. Find the entire code here on Github.

  • How to Implement Server Sent Events Using Python Flask and React

    A typical Request Response cycle works such that client sends request to server and server responds to that request. But there are few use cases where we might need to send data from server without request or client is expecting a data that can arrive at anonymous time. There are few mechanisms available to solve this problem.

    Server Sent Events

    Broadly we can classify these as client pull and server push mechanisms. Websockets is a bi directional mechanism where data is transmitted via full duplex TCP protocol. Client Pull can be done using various mechanisms like –

    1. Manual refresh – where client is refreshed manually
    2. Long polling where a client sends request to server and waits until response is received, as soon as it gets response, a new request is sent.
    3. Short Polling is when a client continuously sends request to server in a definite short intervals.

    Server sent events are a type of Server Push mechanism, where client subscribes to a stream of updates generated by a server and, whenever a new event occurs, a notification is sent to the client.

    Why ServerSide events are better than polling:

    • Scaling and orchestration of backend in real time needs to be managed as users grow.
    • When mobile devices rapidly switch between WiFi and cellular networks or lose connections, and the IP address changes, long polling needs to re-establish connections.
    • With long polling, we need to manage the message queue and catch up missed message.
    • Long polling needs to provide load balancing or fail-over support across multiple servers.

    SSE vs Websockets

    SSEs cannot provide bidirectional client-server communication as opposed to WebSockets. Use cases that require such communication are real-time multiplayer games and messaging and chat apps. When there’s no need for sending data from a client, SSEs might be a better option than WebSockets. Examples of such use cases are status updates, news feeds and other automated data push mechanisms. And backend implementation could be easy with SSE than with Websockets. Also number of open connections is limited for browser for SSE.

    Also, learn about WS vs SSE here.

    Implementation

    The server side code for this can be implemented in any of the high level language. Here is a sample code for Python Flask SSE. Flask SSE requires a broker such as Redis to store the message. Here we are also using Flask APScheduler, to schedule background processes with flask .

    Here we need to install and import ‘flask_sse’ and ‘apscheduler.’

    from flask import Flask, render_template
    from flask_sse import sse
    from apscheduler.schedulers.background import BackgroundScheduler

    Now we need to initialize flask app and provide config for Redis and a route or an URL where the client would be listening to this event.

    app = Flask(__name__)
    app.config["REDIS_URL"] = "redis://localhost"
    app.register_blueprint(sse, url_prefix='/stream')

    To publish data to a stream we need to call publish method from SSE and provide a type of stream.

    sse.publish({"message": datetime.datetime.now()}, type='publish')

    In client, we need to add an event listener which would listen to our stream and read messages.

    var source = new EventSource("{{ url_for('sse.stream') }}");
        source.addEventListener('publish', function(event) {
            var data = JSON.parse(event.data);
            console.log("The server says " + data.message);
        }, false);
        source.addEventListener('error', function(event) {
            console.log("Error"+ event)
            alert("Failed to connect to event stream. Is Redis running?");
        }, false);

    Check out a sample Flask-React-Redis based application demo for server side events.

    Here are some screenshots of client –

    Fig: First Event

     Fig: Second Event

    Server logs:

    api_1    | ('Event Scheduled at ', datetime.datetime(2019, 5, 1, 7, 31, 0, 24564))
    api_1    | ('Event Scheduled at ', datetime.datetime(2019, 5, 1, 7, 31, 14, 30164))
    api_1    | ('Event Scheduled at ', datetime.datetime(2019, 5, 1, 7, 31, 28, 37840))
    api_1    | ('Event Scheduled at ', datetime.datetime(2019, 5, 1, 7, 31, 42, 58162))
    api_1    | ('Event Scheduled at ', datetime.datetime(2019, 5, 1, 7, 31, 56, 46456))
    api_1    | ('Event Scheduled at ', datetime.datetime(2019, 5, 1, 7, 32, 10, 56276))
    api_1    | ('Event Scheduled at ', datetime.datetime(2019, 5, 1, 7, 32, 24, 58445))
    api_1    | ('Event Scheduled at ', datetime.datetime(2019, 5, 1, 7, 32, 38, 57183))
    api_1    | ('Event Scheduled at ', datetime.datetime(2019, 5, 1, 7, 32, 52, 65886))
    api_1    | ('Event Scheduled at ', datetime.datetime(2019, 5, 1, 7, 33, 6, 49818))
    api_1    | ('Event Scheduled at ', datetime.datetime(2019, 5, 1, 7, 33, 20, 22731))
    api_1    | ('Event Scheduled at ', datetime.datetime(2019, 5, 1, 7, 33, 34, 59084))
    api_1    | ('Event Scheduled at ', datetime.datetime(2019, 5, 1, 7, 33, 48, 70346))
    api_1    | ('Event Scheduled at ', datetime.datetime(2019, 5, 1, 7, 34, 2, 58889))
    api_1    | ('Event Scheduled at ', datetime.datetime(2019, 5, 1, 7, 34, 16, 26020))
    api_1    | ('Event Scheduled at ', datetime.datetime(2019, 5, 1, 7, 34, 30, 44040))
    api_1    | ('Event Scheduled at ', datetime.datetime(2019, 5, 1, 7, 34, 44, 61620))
    api_1    | ('Event Scheduled at ', datetime.datetime(2019, 5, 1, 7, 34, 58, 38699))
    api_1    | ('Event Scheduled at ', datetime.datetime(2019, 5, 1, 7, 35, 12, 26067))
    api_1    | ('Event Scheduled at ', datetime.datetime(2019, 5, 1, 7, 35, 26, 71504))
    api_1    | ('Event Scheduled at ', datetime.datetime(2019, 5, 1, 7, 35, 40, 31429))
    api_1    | ('Event Scheduled at ', datetime.datetime(2019, 5, 1, 7, 35, 54, 74451))
    api_1    | ('Event Scheduled at ', datetime.datetime(2019, 5, 1, 7, 36, 8, 63001))
    api_1    | ('Event Scheduled at ', datetime.datetime(2019, 5, 1, 7, 36, 22, 47671))
    api_1    | ('Event Scheduled at ', datetime.datetime(2019, 5, 1, 7, 36, 36, 55458))
    api_1    | ('Event Scheduled at ', datetime.datetime(2019, 5, 1, 7, 36, 50, 68975))
    api_1    | ('Event Scheduled at ', datetime.datetime(2019, 5, 1, 7, 37, 4, 62491))
    api_1    | ('Event SchedINFO:apscheduler.executors.default:Job "server_side_event (trigger: interval[0:00:14], next run at: 2019-05-01 07:37:31 UTC)" executed successfully
    api_1    | INFO:apscheduler.executors.default:Running job "server_side_event (trigger: interval[0:00:16], next run at: 2019-05-01 07:37:38 UTC)" (scheduled at 2019-05-01 07:37:22.362874+00:00)
    api_1    | INFO:apscheduler.executors.default:Job "server_side_event (trigger: interval[0:00:16], next run at: 2019-05-01 07:37:38 UTC)" executed successfully
    api_1    | INFO:apscheduler.executors.default:Running job "server_side_event (trigger: interval[0:00:14], next run at: 2019-05-01 07:37:31 UTC)" (scheduled at 2019-05-01 07:37:31.993944+00:00)
    api_1    | INFO:apscheduler.executors.default:Job "server_side_event (trigger: interval[0:00:14], next run at: 2019-05-01 07:37:45 UTC)" executed successfully
    api_1    | INFO:apscheduler.executors.default:Running job "server_side_event (trigger: interval[0:00:16], next run at: 2019-05-01 07:37:54 UTC)" (scheduled at 2019-05-01 07:37:38.362874+00:00)
    api_1    | INFO:apscheduler.executors.default:Job "server_side_event (trigger: interval[0:00:16], next run at: 2019-05-01 07:37:54 UTC)" executed successfully

    Use Cases of Server Sent Events

    Let’s see the use case with an example – Consider we have a real time graph showing on our web app, one of the possible options is polling where continuously client will poll the server to get new data. Other option would be to use server sent events, which are asynchronous, here the server will send data when updates happen.

    Other applications could be

    • Real time stock price analysis system
    • Real time social media feeds
    • Resource monitoring for health, uptime

    Conclusion

    In this blog, we have covered how we can implement server sent events using Python Flask and React and also how we can use background schedulers with that. This can be used to implement a data delivery from the server to the client using server push.

  • Deploy Serverless, Event-driven Python Applications Using Zappa

    Introduction

    Zappa is a  very powerful open source python project which lets you build, deploy and update your WSGI app hosted on AWS Lambda + API Gateway easily.This blog is a detailed step-by-step focusing on challenges faced while deploying Django application on AWS Lambda using Zappa as a deployment tool.

    Building Your Application

    If you do not have a Django application already you can build one by cloning this GitHub repository.

    $ git clone https://github.com/velotiotech/django-zappa-sample.git    

    Cloning into 'django-zappa-sample'...
    remote: Counting objects: 18, done.
    remote: Compressing objects: 100% (13/13), done.
    remote: Total 18 (delta 1), reused 15 (delta 1), pack-reused 0
    Unpacking objects: 100% (18/18), done.
    Checking connectivity... done.

    Once you have cloned the repository you will need a virtual environment which provides an isolated Python environment for your application. I prefer virtualenvwrapper to create one.

    Command :

    $ mkvirtualenv django_zappa_sample 

    Installing setuptools, pip, wheel...done.
    virtualenvwrapper.user_scripts creating /home/velotio/Envs/django_zappa_sample/bin/predeactivate
    virtualenvwrapper.user_scripts creating /home/velotio/Envs/django_zappa_sample/bin/postdeactivate
    virtualenvwrapper.user_scripts creating /home/velotio/Envs/django_zappa_sample/bin/preactivate
    virtualenvwrapper.user_scripts creating /home/velotio/Envs/django_zappa_sample/bin/postactivate
    virtualenvwrapper.user_scripts creating /home/velotio/Envs/django_zappa_sample/bin/get_env_details

    Install dependencies from requirements.txt.

    $ pip install -r requirements.txt

    Collecting Django==1.11.11 (from -r requirements.txt (line 1))
      Downloading https://files.pythonhosted.org/packages/d5/bf/2cd5eb314aa2b89855c01259c94dc48dbd9be6c269370c1f7ae4979e6e2f/Django-1.11.11-py2.py3-none-any.whl (6.9MB)
        100% |████████████████████████████████| 7.0MB 772kB/s 
    Collecting zappa==0.45.1 (from -r requirements.txt (line 2))
    Collecting pytz (from Django==1.11.11->-r requirements.txt (line 1))
      Downloading https://files.pythonhosted.org/packages/dc/83/15f7833b70d3e067ca91467ca245bae0f6fe56ddc7451aa0dc5606b120f2/pytz-2018.4-py2.py3-none-any.whl (510kB)
        100% |████████████████████████████████| 512kB 857kB/s 
    Collecting future==0.16.0 (from zappa==0.45.1->-r requirements.txt (line 2))
    Collecting toml>=0.9.3 (from zappa==0.45.1->-r requirements.txt (line 2))
    Collecting docutils>=0.12 (from zappa==0.45.1->-r requirements.txt (line 2))
      Using cached https://files.pythonhosted.org/packages/50/09/c53398e0005b11f7ffb27b7aa720c617aba53be4fb4f4f3f06b9b5c60f28/docutils-0.14-py2-none-any.whl
    Collecting PyYAML==3.12 (from zappa==0.45.1->-r requirements.txt (line 2))
    Collecting futures==3.1.1 (from zappa==0.45.1->-r requirements.txt (line 2))
      Using cached https://files.pythonhosted.org/packages/a6/1c/72a18c8c7502ee1b38a604a5c5243aa8c2a64f4bba4e6631b1b8972235dd/futures-3.1.1-py2-none-any.whl
    Requirement already satisfied: wheel>=0.30.0 in /home/velotio/Envs/django_zappa_sample/lib/python2.7/site-packages (from zappa==0.45.1->-r requirements.txt (line 2)) (0.31.1)
    Collecting base58==0.2.4 (from zappa==0.45.1->-r requirements.txt (line 2))
    Collecting durationpy==0.5 (from zappa==0.45.1->-r requirements.txt (line 2))
    Collecting kappa==0.6.0 (from zappa==0.45.1->-r requirements.txt (line 2))
      Using cached https://files.pythonhosted.org/packages/ed/cf/a8aa5964557c8a4828da23d210f8827f9ff190318838b382a4fb6f118f5d/kappa-0.6.0-py2-none-any.whl
    Collecting Werkzeug==0.12 (from zappa==0.45.1->-r requirements.txt (line 2))
      Using cached https://files.pythonhosted.org/packages/ae/c3/f59f6ade89c811143272161aae8a7898735e7439b9e182d03d141de4804f/Werkzeug-0.12-py2.py3-none-any.whl
    Collecting boto3>=1.4.7 (from zappa==0.45.1->-r requirements.txt (line 2))
      Downloading https://files.pythonhosted.org/packages/cd/a3/4d1caf76d8f5aac8ab1ffb4924ecf0a43df1572f6f9a13465a482f94e61c/boto3-1.7.24-py2.py3-none-any.whl (128kB)
        100% |████████████████████████████████| 133kB 1.1MB/s 
    Collecting six>=1.11.0 (from zappa==0.45.1->-r requirements.txt (line 2))
      Using cached https://files.pythonhosted.org/packages/67/4b/141a581104b1f6397bfa78ac9d43d8ad29a7ca43ea90a2d863fe3056e86a/six-1.11.0-py2.py3-none-any.whl
    Collecting tqdm==4.19.1 (from zappa==0.45.1->-r requirements.txt (line 2))
      Using cached https://files.pythonhosted.org/packages/c0/d3/7f930cbfcafae3836be39dd3ed9b77e5bb177bdcf587a80b6cd1c7b85e74/tqdm-4.19.1-py2.py3-none-any.whl
    Collecting argcomplete==1.9.2 (from zappa==0.45.1->-r requirements.txt (line 2))
      Using cached https://files.pythonhosted.org/packages/0f/ee/625763d848016115695942dba31a9937679a25622b6f529a2607d51bfbaa/argcomplete-1.9.2-py2.py3-none-any.whl
    Collecting hjson==3.0.1 (from zappa==0.45.1->-r requirements.txt (line 2))
    Collecting troposphere>=1.9.0 (from zappa==0.45.1->-r requirements.txt (line 2))
    Collecting python-dateutil==2.6.1 (from zappa==0.45.1->-r requirements.txt (line 2))
      Using cached https://files.pythonhosted.org/packages/4b/0d/7ed381ab4fe80b8ebf34411d14f253e1cf3e56e2820ffa1d8844b23859a2/python_dateutil-2.6.1-py2.py3-none-any.whl
    Collecting botocore>=1.7.19 (from zappa==0.45.1->-r requirements.txt (line 2))
      Downloading https://files.pythonhosted.org/packages/65/98/12aa979ca3215d69111026405a9812d7bb0c9ae49e2800b00d3bd794705b/botocore-1.10.24-py2.py3-none-any.whl (4.2MB)
        100% |████████████████████████████████| 4.2MB 768kB/s 
    Collecting requests>=2.10.0 (from zappa==0.45.1->-r requirements.txt (line 2))
      Using cached https://files.pythonhosted.org/packages/49/df/50aa1999ab9bde74656c2919d9c0c085fd2b3775fd3eca826012bef76d8c/requests-2.18.4-py2.py3-none-any.whl
    Collecting jmespath==0.9.3 (from zappa==0.45.1->-r requirements.txt (line 2))
      Using cached https://files.pythonhosted.org/packages/b7/31/05c8d001f7f87f0f07289a5fc0fc3832e9a57f2dbd4d3b0fee70e0d51365/jmespath-0.9.3-py2.py3-none-any.whl
    Collecting wsgi-request-logger==0.4.6 (from zappa==0.45.1->-r requirements.txt (line 2))
    Collecting lambda-packages==0.19.0 (from zappa==0.45.1->-r requirements.txt (line 2))
    Collecting python-slugify==1.2.4 (from zappa==0.45.1->-r requirements.txt (line 2))
      Using cached https://files.pythonhosted.org/packages/9f/77/ab7134b731d0e831cf82861c1ab0bb318e80c41155fa9da18958f9d96057/python_slugify-1.2.4-py2.py3-none-any.whl
    Collecting placebo>=0.8.1 (from kappa==0.6.0->zappa==0.45.1->-r requirements.txt (line 2))
    Collecting click>=5.1 (from kappa==0.6.0->zappa==0.45.1->-r requirements.txt (line 2))
      Using cached https://files.pythonhosted.org/packages/34/c1/8806f99713ddb993c5366c362b2f908f18269f8d792aff1abfd700775a77/click-6.7-py2.py3-none-any.whl
    Collecting s3transfer<0.2.0,>=0.1.10 (from boto3>=1.4.7->zappa==0.45.1->-r requirements.txt (line 2))
      Using cached https://files.pythonhosted.org/packages/d7/14/2a0004d487464d120c9fb85313a75cd3d71a7506955be458eebfe19a6b1d/s3transfer-0.1.13-py2.py3-none-any.whl
    Collecting cfn-flip>=0.2.5 (from troposphere>=1.9.0->zappa==0.45.1->-r requirements.txt (line 2))
    Collecting certifi>=2017.4.17 (from requests>=2.10.0->zappa==0.45.1->-r requirements.txt (line 2))
      Using cached https://files.pythonhosted.org/packages/7c/e6/92ad559b7192d846975fc916b65f667c7b8c3a32bea7372340bfe9a15fa5/certifi-2018.4.16-py2.py3-none-any.whl
    Collecting chardet<3.1.0,>=3.0.2 (from requests>=2.10.0->zappa==0.45.1->-r requirements.txt (line 2))
      Using cached https://files.pythonhosted.org/packages/bc/a9/01ffebfb562e4274b6487b4bb1ddec7ca55ec7510b22e4c51f14098443b8/chardet-3.0.4-py2.py3-none-any.whl
    Collecting idna<2.7,>=2.5 (from requests>=2.10.0->zappa==0.45.1->-r requirements.txt (line 2))
      Using cached https://files.pythonhosted.org/packages/27/cc/6dd9a3869f15c2edfab863b992838277279ce92663d334df9ecf5106f5c6/idna-2.6-py2.py3-none-any.whl
    Collecting urllib3<1.23,>=1.21.1 (from requests>=2.10.0->zappa==0.45.1->-r requirements.txt (line 2))
      Using cached https://files.pythonhosted.org/packages/63/cb/6965947c13a94236f6d4b8223e21beb4d576dc72e8130bd7880f600839b8/urllib3-1.22-py2.py3-none-any.whl
    Collecting Unidecode>=0.04.16 (from python-slugify==1.2.4->zappa==0.45.1->-r requirements.txt (line 2))
      Using cached https://files.pythonhosted.org/packages/59/ef/67085e30e8bbcdd76e2f0a4ad8151c13a2c5bce77c85f8cad6e1f16fb141/Unidecode-1.0.22-py2.py3-none-any.whl
    Installing collected packages: pytz, Django, future, toml, docutils, PyYAML, futures, base58, durationpy, jmespath, six, python-dateutil, botocore, s3transfer, boto3, placebo, click, kappa, Werkzeug, tqdm, argcomplete, hjson, cfn-flip, troposphere, certifi, chardet, idna, urllib3, requests, wsgi-request-logger, lambda-packages, Unidecode, python-slugify, zappa
    Successfully installed Django-1.11.11 PyYAML-3.12 Unidecode-1.0.22 Werkzeug-0.12 argcomplete-1.9.2 base58-0.2.4 boto3-1.7.24 botocore-1.10.24 certifi-2018.4.16 cfn-flip-1.0.3 chardet-3.0.4 click-6.7 docutils-0.14 durationpy-0.5 future-0.16.0 futures-3.1.1 hjson-3.0.1 idna-2.6 jmespath-0.9.3 kappa-0.6.0 lambda-packages-0.19.0 placebo-0.8.1 python-dateutil-2.6.1 python-slugify-1.2.4 pytz-2018.4 requests-2.18.4 s3transfer-0.1.13 six-1.11.0 toml-0.9.4 tqdm-4.19.1 troposphere-2.2.1 urllib3-1.22 wsgi-request-logger-0.4.6 zappa-0.45.1
    @velotiotech

    Now if you run the server directly it will log a warning as the database is not set up yet.

    $ python manage.py runserver  

    Performing system checks...
    
    System check identified no issues (0 silenced).
    
    You have 13 unapplied migration(s). Your project may not work properly until you apply the migrations for app(s): admin, auth, contenttypes, sessions.
    Run 'python manage.py migrate' to apply them.
    
    May 20, 2018 - 14:47:32
    Django version 1.11.11, using settings 'django_zappa_sample.settings'
    Starting development server at http://127.0.0.1:8000/
    Quit the server with CONTROL-C.

    Also trying to access admin page (http://localhost:8000/admin/) will throw an “OperationalError” exception with below log at server end.

    Internal Server Error: /admin/
    Traceback (most recent call last):
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/core/handlers/exception.py", line 41, in inner
        response = get_response(request)
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/core/handlers/base.py", line 187, in _get_response
        response = self.process_exception_by_middleware(e, request)
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/core/handlers/base.py", line 185, in _get_response
        response = wrapped_callback(request, *callback_args, **callback_kwargs)
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/contrib/admin/sites.py", line 242, in wrapper
        return self.admin_view(view, cacheable)(*args, **kwargs)
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/utils/decorators.py", line 149, in _wrapped_view
        response = view_func(request, *args, **kwargs)
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/views/decorators/cache.py", line 57, in _wrapped_view_func
        response = view_func(request, *args, **kwargs)
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/contrib/admin/sites.py", line 213, in inner
        if not self.has_permission(request):
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/contrib/admin/sites.py", line 187, in has_permission
        return request.user.is_active and request.user.is_staff
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/utils/functional.py", line 238, in inner
        self._setup()
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/utils/functional.py", line 386, in _setup
        self._wrapped = self._setupfunc()
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/contrib/auth/middleware.py", line 24, in <lambda>
        request.user = SimpleLazyObject(lambda: get_user(request))
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/contrib/auth/middleware.py", line 12, in get_user
        request._cached_user = auth.get_user(request)
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/contrib/auth/__init__.py", line 211, in get_user
        user_id = _get_user_session_key(request)
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/contrib/auth/__init__.py", line 61, in _get_user_session_key
        return get_user_model()._meta.pk.to_python(request.session[SESSION_KEY])
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/contrib/sessions/backends/base.py", line 57, in __getitem__
        return self._session[key]
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/contrib/sessions/backends/base.py", line 207, in _get_session
        self._session_cache = self.load()
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/contrib/sessions/backends/db.py", line 35, in load
        expire_date__gt=timezone.now()
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/db/models/manager.py", line 85, in manager_method
        return getattr(self.get_queryset(), name)(*args, **kwargs)
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/db/models/query.py", line 374, in get
        num = len(clone)
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/db/models/query.py", line 232, in __len__
        self._fetch_all()
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/db/models/query.py", line 1118, in _fetch_all
        self._result_cache = list(self._iterable_class(self))
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/db/models/query.py", line 53, in __iter__
        results = compiler.execute_sql(chunked_fetch=self.chunked_fetch)
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/django/db/models/sql/compiler.py", line 899, in execute_sql
        raise original_exception
    OperationalError: no such table: django_session
    [20/May/2018 14:59:23] "GET /admin/ HTTP/1.1" 500 153553
    Not Found: /favicon.ico

    In order to fix this you need to run the migration into your database so that essential tables like auth_user, sessions, etc are created before any request is made to the server.

    $ python manage.py migrate 

    Operations to perform:
      Apply all migrations: admin, auth, contenttypes, sessions
    Running migrations:
      Applying contenttypes.0001_initial... OK
      Applying auth.0001_initial... OK
      Applying admin.0001_initial... OK
      Applying admin.0002_logentry_remove_auto_add... OK
      Applying contenttypes.0002_remove_content_type_name... OK
      Applying auth.0002_alter_permission_name_max_length... OK
      Applying auth.0003_alter_user_email_max_length... OK
      Applying auth.0004_alter_user_username_opts... OK
      Applying auth.0005_alter_user_last_login_null... OK
      Applying auth.0006_require_contenttypes_0002... OK
      Applying auth.0007_alter_validators_add_error_messages... OK
      Applying auth.0008_alter_user_username_max_length... OK
      Applying sessions.0001_initial... OK

    NOTE: Use DATABASES from project settings file to configure your database that you would want your Django application to use once hosted on AWS Lambda. By default, its configured to create a local SQLite database file as backend.

    You can run the server again and it should now load the admin panel of your website.

    Do verify if you have the zappa python package into your virtual environment before moving forward.

    Configuring Zappa Settings

    Deploying with Zappa is simple as it only needs a configuration file to run and rest will be managed by Zappa. To create this configuration file run from your project root directory –

    $ zappa init 

    ███████╗ █████╗ ██████╗ ██████╗  █████╗
    ╚══███╔╝██╔══██╗██╔══██╗██╔══██╗██╔══██╗
      ███╔╝ ███████║██████╔╝██████╔╝███████║
     ███╔╝  ██╔══██║██╔═══╝ ██╔═══╝ ██╔══██║
    ███████╗██║  ██║██║     ██║     ██║  ██║
    ╚══════╝╚═╝  ╚═╝╚═╝     ╚═╝     ╚═╝  ╚═╝
    
    Welcome to Zappa!
    
    Zappa is a system for running server-less Python web applications on AWS Lambda and AWS API Gateway.
    This `init` command will help you create and configure your new Zappa deployment.
    Let's get started!
    
    Your Zappa configuration can support multiple production stages, like 'dev', 'staging', and 'production'.
    What do you want to call this environment (default 'dev'): 
    
    AWS Lambda and API Gateway are only available in certain regions. Let's check to make sure you have a profile set up in one that will work.
    We found the following profiles: default, and hdx. Which would you like us to use? (default 'default'): 
    
    Your Zappa deployments will need to be uploaded to a private S3 bucket.
    If you don't have a bucket yet, we'll create one for you too.
    What do you want call your bucket? (default 'zappa-108wqhyn4'): django-zappa-sample-bucket
    
    It looks like this is a Django application!
    What is the module path to your projects's Django settings?
    We discovered: django_zappa_sample.settings
    Where are your project's settings? (default 'django_zappa_sample.settings'): 
    
    You can optionally deploy to all available regions in order to provide fast global service.
    If you are using Zappa for the first time, you probably don't want to do this!
    Would you like to deploy this application globally? (default 'n') [y/n/(p)rimary]: n
    
    Okay, here's your zappa_settings.json:
    
    {
        "dev": {
            "aws_region": "us-east-1", 
            "django_settings": "django_zappa_sample.settings", 
            "profile_name": "default", 
            "project_name": "django-zappa-sa", 
            "runtime": "python2.7", 
            "s3_bucket": "django-zappa-sample-bucket"
        }
    }
    
    Does this look okay? (default 'y') [y/n]: y
    
    Done! Now you can deploy your Zappa application by executing:
    
    	$ zappa deploy dev
    
    After that, you can update your application code with:
    
    	$ zappa update dev
    
    To learn more, check out our project page on GitHub here: https://github.com/Miserlou/Zappa
    and stop by our Slack channel here: https://slack.zappa.io
    
    Enjoy!,
     ~ Team Zappa!

    You can verify zappa_settings.json generated at your project root directory.

    TIP: The virtual environment name should not be the same as the Zappa project name, as this may cause errors.

    Additionally, you could specify other settings in  zappa_settings.json file as per requirement using Advanced Settings.

    Now, you’re ready to deploy!

    IAM Permissions

    In order to deploy the Django Application to Lambda/Gateway, setup an IAM role (eg. ZappaLambdaExecutionRole) with the following permissions:

    {
    "Version": "2012-10-17",
    "Statement": [
    {
    "Effect": "Allow",
    "Action": [
    "iam:AttachRolePolicy",
    "iam:CreateRole",
    "iam:GetRole",
    "iam:PutRolePolicy"
    ],
    "Resource": [
    "*"
    ]
    },
    {
    "Effect": "Allow",
    "Action": [
    "iam:PassRole"
    ],
    "Resource": [
    "arn:aws:iam:::role/*-ZappaLambdaExecutionRole"
    ]
    },
    {
    "Effect": "Allow",
    "Action": [
    "apigateway:DELETE",
    "apigateway:GET",
    "apigateway:PATCH",
    "apigateway:POST",
    "apigateway:PUT",
    "events:DeleteRule",
    "events:DescribeRule",
    "events:ListRules",
    "events:ListTargetsByRule",
    "events:ListRuleNamesByTarget",
    "events:PutRule",
    "events:PutTargets",
    "events:RemoveTargets",
    "lambda:AddPermission",
    "lambda:CreateFunction",
    "lambda:DeleteFunction",
    "lambda:GetFunction",
    "lambda:GetPolicy",
    "lambda:ListVersionsByFunction",
    "lambda:RemovePermission",
    "lambda:UpdateFunctionCode",
    "lambda:UpdateFunctionConfiguration",
    "cloudformation:CreateStack",
    "cloudformation:DeleteStack",
    "cloudformation:DescribeStackResource",
    "cloudformation:DescribeStacks",
    "cloudformation:ListStackResources",
    "cloudformation:UpdateStack",
    "logs:DescribeLogStreams",
    "logs:FilterLogEvents",
    "route53:ListHostedZones",
    "route53:ChangeResourceRecordSets",
    "route53:GetHostedZone",
    "s3:CreateBucket",
    ],
    "Resource": [
    "*"
    ]
    },
    {
    "Effect": "Allow",
    "Action": [
    "s3:ListBucket"
    ],
    "Resource": [
    "arn:aws:s3:::"
    ]
    },
    {
    "Effect": "Allow",
    "Action": [
    "s3:DeleteObject",
    "s3:GetObject",
    "s3:PutObject",
    "s3:CreateMultipartUpload",
    "s3:AbortMultipartUpload",
    "s3:ListMultipartUploadParts",
    "s3:ListBucketMultipartUploads"
    ],
    "Resource": [
    "arn:aws:s3:::/*"
    ]
    }
    ]
    }

    Deploying Django Application

    Before deploying the application, ensure that the IAM role is set in the config JSON as follows:

    {
    "dev": {
    ...
    "manage_roles": false, // Disable Zappa client managing roles.
    "role_name": "MyLambdaRole", // Name of your Zappa execution role. Optional, default: --ZappaExecutionRole.
    "role_arn": "arn:aws:iam::12345:role/app-ZappaLambdaExecutionRole", // ARN of your Zappa execution role. Optional.
    ...
    },
    ...
    }

    Once your settings are configured, you can package and deploy your application to a stage called “dev” with a single command:

    $ zappa deploy dev

    Calling deploy for stage dev..
    Downloading and installing dependencies..
    Packaging project as zip.
    Uploading django-zappa-sa-dev-1526831069.zip (10.9MiB)..
    100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 11.4M/11.4M [01:02<00:00, 75.3KB/s]
    Scheduling..
    Scheduled django-zappa-sa-dev-zappa-keep-warm-handler.keep_warm_callback with expression rate(4 minutes)!
    Uploading django-zappa-sa-dev-template-1526831157.json (1.6KiB)..
    100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 1.60K/1.60K [00:02<00:00, 792B/s]
    Waiting for stack django-zappa-sa-dev to create (this can take a bit)..
    100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 4/4 [00:11<00:00,  2.92s/res]
    Deploying API Gateway..
    Deployment complete!: https://akg59b222b.execute-api.us-east-1.amazonaws.com/dev

    You should see that your Zappa deployment completed successfully with URL to API gateway created for your application.

    Troubleshooting

    1. If you are seeing the following error while deployment, it’s probably because you do not have sufficient privileges to run deployment on AWS Lambda. Ensure your IAM role has all the permissions as described above or set “manage_roles” to true so that Zappa can create and manage the IAM role for you.

    Calling deploy for stage dev..
    Creating django-zappa-sa-dev-ZappaLambdaExecutionRole IAM Role..
    Error: Failed to manage IAM roles!
    You may lack the necessary AWS permissions to automatically manage a Zappa execution role.
    To fix this, see here: https://github.com/Miserlou/Zappa#using-custom-aws-iam-roles-and-policies

    2. The below error will be caused as you have not listed “events.amazonaws.com” as Trusted Entity for your IAM Role. You can add the same or set “keep_warm” parameter to false in your Zappa settings file. Your Zappa deployment was partially deployed as it got terminated abnormally.

    Downloading and installing dependencies..
    100%|████████████████████████████████████████████| 44/44 [00:05<00:00, 7.92pkg/s]
    Packaging project as zip..
    Uploading django-zappa-sample-dev-1482817370.zip (8.8MiB)..
    100%|█████████████████████████████████████████| 9.22M/9.22M [00:17<00:00, 527KB/s]
    Scheduling...
    Oh no! An error occurred! :(
    
    ==============
    
    Traceback (most recent call last):
    Traceback (most recent call last):
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/cli.py", line 2610, in handle
        sys.exit(cli.handle())
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/cli.py", line 505, in handle
        self.dispatch_command(self.command, stage)
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/cli.py", line 539, in dispatch_command
        self.deploy(self.vargs['zip'])
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/cli.py", line 800, in deploy
        self.zappa.add_binary_support(api_id=api_id, cors=self.cors)
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/core.py", line 1490, in add_binary_support
        restApiId=api_id
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/botocore/client.py", line 314, in _api_call
        return self._make_api_call(operation_name, kwargs)
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/botocore/client.py", line 612, in _make_api_call
        raise error_class(parsed_response, operation_name)
    ClientError: An error occurred (ValidationError) when calling the PutRole operation: Provided role 'arn:aws:iam:484375727565:role/lambda_basic_execution' cannot be assumed by principal
    'events.amazonaws.com'.
    
    ==============
    
    Need help? Found a bug? Let us know! :D
    File bug reports on GitHub here: https://github.com/Miserlou/Zappa
    And join our Slack channel here: https://slack.zappa.io
    Love!,
    ~ Team Zappa!

    3. Adding the parameter and running zappa update will cause above error. As you can see it says “Stack django-zappa-sa-dev does not exists” as the previous deployment was unsuccessful. To fix this, delete the Lambda function from console and rerun the deployment.

    Downloading and installing dependencies..
    100%|████████████████████████████████████████████| 44/44 [00:05<00:00, 7.92pkg/s]
    Packaging project as zip..
    Uploading django-zappa-sample-dev-1482817370.zip (8.8MiB)..
    100%|█████████████████████████████████████████| 9.22M/9.22M [00:17<00:00, 527KB/s]
    Updating Lambda function code..
    Updating Lambda function configuration..
    Uploading djangoo-zapppa-sample-dev-template-1482817403.json (1.5KiB)..
    100%|████████████████████████████████████████| 1.56K/1.56K [00:00<00:00, 6.56KB/s]
    CloudFormation stack missing, re-deploy to enable updates
    ERROR:Could not get API ID.
    Traceback (most recent call last):
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/cli.py", line 2610, in handle
        sys.exit(cli.handle())
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/cli.py", line 505, in handle
        self.dispatch_command(self.command, stage)
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/cli.py", line 539, in dispatch_command
        self.deploy(self.vargs['zip'])
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/cli.py", line 800, in deploy
        self.zappa.add_binary_support(api_id=api_id, cors=self.cors)
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/core.py", line 1490, in add_binary_support
        restApiId=api_id
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/botocore/client.py", line 314, in _api_call
        return self._make_api_call(operation_name, kwargs)
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/botocore/client.py", line 612, in _make_api_call
        raise error_class(parsed_response, operation_name)
    ClientError: An error occurred (ValidationError) when calling the DescribeStackResource operation: Stack 'django-zappa-sa-dev' does not exist
    Deploying API Gateway..
    Oh no! An error occurred! :(
    
    ==============
    
    Traceback (most recent call last):
    File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/cli.py", line 1847, in handle
    sys.exit(cli.handle())
    File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/cli.py", line 345, in handle
    self.dispatch_command(self.command, environment)
    File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/cli.py", line 379, in dispatch_command
    self.update()
    File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/cli.py", line 605, in update
    endpoint_url = self.deploy_api_gateway(api_id)
    File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/cli.py", line 1816, in deploy_api_gateway
    cloudwatch_metrics_enabled=self.zappa_settings[self.api_stage].get('cloudwatch_metrics_enabled', False),
    File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/zappa.py", line 1014, in deploy_api_gateway
    variables=variables or {}
    File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/botocore/client.py", line 251, in _api_call
    return self._make_api_call(operation_name, kwargs)
    File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/botocore/client.py", line 513, in _make_api_call
    api_params, operation_model, context=request_context)
    File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/botocore/client.py", line 566, in _convert_to_request_dict
    api_params, operation_model)
    File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/botocore/validate.py", line 270, in serialize_to_request
    raise ParamValidationError(report=report.generate_report())
    ParamValidationError: Parameter validation failed:
    Invalid type for parameter restApiId, value: None, type: <type 'NoneType'>, valid types: <type 'basestring'>
    
    ==============
    
    Need help? Found a bug? Let us know! :D
    File bug reports on GitHub here: https://github.com/Miserlou/Zappa
    And join our Slack channel here: https://slack.zappa.io
    Love!,
    ~ Team Zappa!

    4.  If you run into any distribution error, please try down-grading your pip version to 9.0.1.

    $ pip install pip==9.0.1   

    Calling deploy for stage dev..
    Downloading and installing dependencies..
    Oh no! An error occurred! :(
    
    ==============
    
    Traceback (most recent call last):
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/cli.py", line 2610, in handle
        sys.exit(cli.handle())
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/cli.py", line 505, in handle
        self.dispatch_command(self.command, stage)
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/cli.py", line 539, in dispatch_command
        self.deploy(self.vargs['zip'])
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/cli.py", line 709, in deploy
        self.create_package()
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/cli.py", line 2171, in create_package
        disable_progress=self.disable_progress
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/core.py", line 595, in create_lambda_zip
        installed_packages = self.get_installed_packages(site_packages, site_packages_64)
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/core.py", line 751, in get_installed_packages
        pip.get_installed_distributions()
    AttributeError: 'module' object has no attribute 'get_installed_distributions'
    
    ==============
    
    Need help? Found a bug? Let us know! :D
    File bug reports on GitHub here: https://github.com/Miserlou/Zappa
    And join our Slack channel here: https://slack.zappa.io
    Love!,
     ~ Team Zappa!

    or,

    If you run into NotFoundException(Invalid REST API Identifier issue) please try undeploying the Zappa stage and retry again.

    Calling deploy for stage dev..
    Downloading and installing dependencies..
    Packaging project as zip.
    Uploading django-zappa-sa-dev-1526830532.zip (10.9MiB)..
    100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 11.4M/11.4M [00:42<00:00, 331KB/s]
    Scheduling..
    Scheduled django-zappa-sa-dev-zappa-keep-warm-handler.keep_warm_callback with expression rate(4 minutes)!
    Uploading django-zappa-sa-dev-template-1526830690.json (1.6KiB)..
    100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 1.60K/1.60K [00:01<00:00, 801B/s]
    Oh no! An error occurred! :(
    
    ==============
    
    Traceback (most recent call last):
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/cli.py", line 2610, in handle
        sys.exit(cli.handle())
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/cli.py", line 505, in handle
        self.dispatch_command(self.command, stage)
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/cli.py", line 539, in dispatch_command
        self.deploy(self.vargs['zip'])
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/cli.py", line 800, in deploy
        self.zappa.add_binary_support(api_id=api_id, cors=self.cors)
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/zappa/core.py", line 1490, in add_binary_support
        restApiId=api_id
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/botocore/client.py", line 314, in _api_call
        return self._make_api_call(operation_name, kwargs)
      File "/home/velotio/Envs/django_zappa_sample/local/lib/python2.7/site-packages/botocore/client.py", line 612, in _make_api_call
        raise error_class(parsed_response, operation_name)
    NotFoundException: An error occurred (NotFoundException) when calling the GetRestApi operation: Invalid REST API identifier specified 484375727565:akg59b222b
    
    ==============
    
    Need help? Found a bug? Let us know! :D
    File bug reports on GitHub here: https://github.com/Miserlou/Zappa
    And join our Slack channel here: https://slack.zappa.io
    Love!,
     ~ Team Zappa!

    TIP: To understand how your application works on serverless environment please visit this link.

    Post Deployment Setup

    Migrate database

    At this point, you should have an empty database for your Django application to fill up with a schema.

    $ zappa manage.py migrate dev

    Once you run above command the database migrations will be applied on the database as specified in your Django settings.

    Creating Superuser of Django Application

    You also might need to create a new superuser on the database. You could use the following command on your project directory.

    $ zappa invoke --raw dev "from django.contrib.auth.models import User; User.objects.create_superuser('username', 'username@yourdomain.com', 'password')"

    Alternatively,

    $ python manage createsuperuser

    Note that your application must be connected to the same database as this is run as standard Django administration command (not a Zappa command).

    Managing static files

    Your Django application will be having a dependency on static files, Django admin panel uses a combination of JS, CSS and image files.

    NOTE: Zappa is for running your application code, not for serving static web assets. If you plan on serving custom static assets in your web application (CSS/JavaScript/images/etc.), you’ll likely want to use a combination of AWS S3 and AWS CloudFront.

    You will need to add following packages to your virtual environment required for management of files to and from S3 django-storages and boto.

    $ pip install django-storages boto
    Add Django-Storage to your INSTALLED_APPS in settings.py
    INSTALLED_APPS = (
    ...,
    storages',
    )
    
    Configure Django-storage in settings.py as
    
    AWS_STORAGE_BUCKET_NAME = 'django-zappa-sample-bucket'
    AWS_S3_CUSTOM_DOMAIN = '%s.s3.amazonaws.com' % AWS_STORAGE_BUCKET_NAME
    STATIC_URL = "https://%s/" % AWS_S3_CUSTOM_DOMAIN
    STATICFILES_STORAGE = 'storages.backends.s3boto.S3BotoStorage'

    Once you have setup the Django application to serve your static files from AWS S3, run following command to upload the static file from your project to S3.

    $ python manage.py collectstatic --noinput

    or

    $ zappa update dev
    $ zappa manage dev "collectstatic --noinput"

    Check that at least 61 static files are moved to S3 bucket. Admin panel is built over  61 static files.

    NOTE: STATICFILES_DIR must be configured properly to collect your files from the appropriate location.

    Tip: You need to render static files in your templates by loading static path and using the same.  Example, {% static %}

    Setting Up API Gateway

    To connect to your Django application you also need to ensure you have API gateway setup for your AWS Lambda Function.  You need to have GET methods set up for all the URL resources used in your Django application. Alternatively, you can setup a proxy method to allow all subresources to be processed through one API method.

    Go to AWS Lambda function console and add API Gateway from ‘Add triggers’.

    1. Configure API, Deployment Stage, and Security for API Gateway. Click Save once it is done.

    2. Go to API Gateway console and,

    a. Recreate ANY method for / resource.

    i. Check `Use Lambda Proxy integration`

    ii. Set `Lambda Region` and `Lambda Function` and `Save` it.

    a. Recreate ANY method for /{proxy+} resource.

    i. Select `Lambda Function Proxy`

    ii. Set`Lambda Region` and `Lambda Function` and `Save` it.

    3. Click on Action and select Deploy API. Set Deployment Stage and click Deploy

    4. Ensure that GET and POST method for / and Proxy are set as Override for this method

    Setting Up Custom SSL Endpoint

    Optionally, you could also set up your own custom defined SSL endpoint with Zappa and install your certificate with your domain by running certify with Zappa. 

    $ zappa certify dev
    
    ...
    "certificate_arn": "arn:aws:acm:us-east-1:xxxxxxxxxxxx:certificate/xxxxxxxxxxxx-xxxxxx-xxxx-xxxx-xxxxxxxxxxxxxx",
    "domain": "django-zappa-sample.com"

    Now you are ready to launch your Django Application hosted on AWS Lambda.

    Additional Notes:

    •  Once deployed, you must run “zappa update <stage-name>” for updating your already hosted AWS Lambda function.</stage-name>
    • You can check server logs for investigation by running “zappa tail” command.
    • To un-deploy your application, simply run: `zappa undeploy <stage-name>`</stage-name>

    You’ve seen how to deploy Django application on AWS Lambda using Zappa. If you are creating your Django application for first time you might also want to read Edgar Roman’s Django Zappa Guide.

    Start building your Django application and let us know in the comments if you need any help during your application deployment over AWS Lambda.