Tag: elasticsearch

  • Vector Search: The New Frontier in Personalized Recommendations

    Introduction

    Imagine you are a modern-day treasure hunter, not in search of hidden gold, but rather the wealth of knowledge and entertainment hidden within the vast digital ocean of content. In this realm, where every conceivable topic has its own sea of content, discovering what will truly captivate you is like finding a needle in an expansive haystack.

    This challenge leads us to the marvels of recommendation services, acting as your compass in this digital expanse. These services are the unsung heroes behind the scenes of your favorite platforms, from e-commerce sites that suggest enticing products to streaming services that understand your movie preferences better than you might yourself. They sift through immense datasets of user interactions and content features, striving to tailor your online experience to be more personalized, engaging, and enriching.

    But what if I told you that there is a cutting-edge technology that can take personalized recommendations to the next level? Today, I will take you through a journey to build a blog recommendation service that understands the contextual similarities between different pieces of content, transcending beyond basic keyword matching. We’ll harness the power of vector search, a technology that’s revolutionizing personalized recommendations. We’ll explore how recommendation services are traditionally implemented, and then briefly discuss how vector search enhances them.

    Finally, we’ll put this knowledge to work, using OpenAI’s embedding API and Elasticsearch to create a recommendation service that not only finds content but also understands and aligns it with your unique interests.

    Exploring the Landscape: Traditional Recommendation Systems and Their Limits

    Traditionally, these digital compasses, or recommendation systems, employ methods like collaborative and content-based filtering. Imagine sitting in a café where the barista suggests a coffee based on what others with similar tastes enjoyed (collaborative filtering) or based on your past coffee choices (content-based filtering). While these methods have been effective in many scenarios, they come with some limitations. They often stumble when faced with the vast and unstructured wilderness of web data, struggling to make sense of the diverse and ever-expanding content landscape. Additionally, when user preferences are ambiguous or when you want to recommend content by truly understanding it on a semantic level, traditional methods may fall short.

    Enhancing Recommendation with Vector Search and Vector Databases

    Our journey now takes an exciting turn with vector search and vector databases, the modern tools that help us navigate this unstructured data. These technologies transform our café into a futuristic spot where your coffee preference is understood on a deeper, more nuanced level.

    Vector Search: The Art of Finding Similarities

    Vector search operates like a seasoned traveler who understands the essence of every place visited. Text, images, or sounds can be transformed into numerical vectors, like unique coordinates on a map. The magic happens when these vectors are compared, revealing hidden similarities and connections, much like discovering that two seemingly different cities share a similar vibe.

    Vector Databases: Navigating Complex Data Landscapes

    Imagine a vast library of books where each book captures different aspects of a place along with its coordinates. Vector databases are akin to this library, designed to store and navigate these complex data points. They easily handle intricate queries over large datasets, making them perfect for our recommendation service, ensuring no blog worth reading remains undiscovered.

    Embeddings: Semantic Representation

    In our journey, embeddings are akin to a skilled artist who captures not just the visuals but the soul of a landscape. They map items like words or entire documents into real-number vectors, encapsulating their deeper meaning. This helps in understanding and comparing different pieces of content on a semantic level, letting the recommendation service show you things that really match your interests.

    Sample Project: Blog Recommendation Service

    Project Overview

    Now, let’s craft a simple blog recommendation service using OpenAI’s embedding APIs and Elasticsearch as a vector database. The goal is to recommend blogs similar to the current one the user is reading, which can be shown in the read more or recommendation section.

    Our blogs service will be responsible for indexing the blogs, finding similar one,  and interacting with the UI Service.

    Tools and Setup

    We will need the following tools to build our service:

    • OpenAI Account: We will be using OpenAI’s embedding API to generate the embeddings for our blog content. You will need an OpenAI account to use the APIs. Once you have created an account, please create an API key and store it in a secure location.
    • Elasticsearch: A popular database renowned for its full-text search capabilities, which can also be used as a vector database, adept at storing and querying complex embeddings with its dense_vector field type.
    • Docker: A tool that allows developers to package their applications and all the necessary dependencies into containers, ensuring that the application runs smoothly and consistently across different computing environments.
    • Python: A versatile programming language for developers across diverse fields, from web development to data science.

    The APIs will be created using the FastAPI framework, but you can choose any framework.

    Steps

    First, we’ll create a BlogItem class to represent each blog. It has only three fields, which will be enough for this demonstration, but real-world entities would have more details to accommodate a wider range of properties and functionalities.

    class BlogItem:
        blog_id: int
        title: str
        content: str
    
        def __init__(self, blog_id: int, title: str, content: str):
            self.blog_id = blog_id
            self.title = title
            self.content = content

    Elasticsearch Setup:

    • To store the blog data along with its embedding in Elasticsearch, we need to set up a local Elasticsearch cluster and then create an index for our blogs. You can also use a cloud-based version if you have already procured one for personal use.
    • Install Docker or Docker Desktop on your machine and create Elasticsearch and Kibana docker containers using the below docker compose file. Run the following command to create and start the services in the background:
    • docker compose -f /path/to/your/docker-compose/file up -d. 
    • You can exclude the file path if you are in the same directory as your docker-compose.yml file. The advantage of using docker compose is that it allows you to clean up these resources with just one command.
    • docker compose -f /path/to/your/docker-compose/file down.
    version: '3'
    
    services:
    
      elasticsearch:
        image: docker.elastic.co/elasticsearch/elasticsearch:<version>
        container_name: elasticsearch
        environment:
          - node.name=docker-cluster
          - discovery.type=single-node
          - cluster.routing.allocation.disk.threshold_enabled=false
          - bootstrap.memory_lock=true
          - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
          - xpack.security.enabled=true
          - ELASTIC_PASSWORD=YourElasticPassword
          - "ELASTICSEARCH_USERNAME=elastic"
        ulimits:
          memlock:
            soft: -1
            hard: -1
        volumes:
          - esdata:/usr/share/elasticsearch/data
        ports:
          - "9200:9200"
        networks:
          - esnet
    
      kibana:
        image: docker.elastic.co/kibana/kibana:<version>
        container_name: kibana
        environment:
          ELASTICSEARCH_HOSTS: http://elasticsearch:9200
          ELASTICSEARCH_USERNAME: elastic
          ELASTICSEARCH_PASSWORD: YourElasticPassword
        ports:
          - "5601:5601"
        networks:
          - esnet
        depends_on:
          - elasticsearch
    
    networks:
      esnet:
        driver: bridge
    
    volumes:
      esdata:
        driver: local

    • Connect to the local ES instance and create an index. Our “blogs” index will have a unique blog ID, blog title, blog content, and an embedding field to store the vector representation of blog content. The text-embedding-ada-002 model we have used here produces vectors with 1536 dimensions; hence, it’s important to use the same in our embeddings field in the blogs index.
    import logging
    from elasticsearch import Elasticsearch
    
    # Setting up logging
    logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(filename)s:%(lineno)d - %(message)s',
                        datefmt='%Y-%m-%d %H:%M:%S')
    # Elasticsearch client setup
    es = Elasticsearch(hosts="http://localhost:9200",
                       basic_auth=("elastic", "YourElasticPassword"))
    
    
    # Create an index with a mappings for embeddings
    def create_index(index_name: str, embedding_dimensions: int):
        try:
            es.indices.create(
                index=index_name,
                body={
                    'mappings': {
                        'properties': {
                            'blog_id': {'type': 'long'},
                            'title': {'type': 'text'},
                            'content': {'type': 'text'},
                            'embedding': {'type': 'dense_vector', 'dims': embedding_dimensions}
                        }
                    }
                }
            )
        except Exception as e:
            logging.error(f"An error occurred while creating index {index_name} : {e}")
    
    
    # Sample usage
    create_index("blogs", 1536)

    Create Embeddings AND Index Blogs:

    • We use OpenAI’s Embedding API to get a vector representation of our blog title and content. I am using the 002 model here, which is recommended by Open AI for most use cases. The input to the text-embedding-ada-002 should not exceed 8291 tokens (1000 tokens are roughly equal to 750 words) and cannot be empty.
    from openai import OpenAI, OpenAIError
    
    api_key = ‘YourApiKey’
    client = OpenAI(api_key=api_key)
    
    def create_embeddings(text: str, model: str = "text-embedding-ada-002") -> list[float]:
        try:
            text = text.replace("n", " ")
            response = client.embeddings.create(input=[text], model=model)
            logging.info(f"Embedding created successfully")
            return response.data[0].embedding
        except OpenAIError as e:
            logging.error(f"An OpenAI error occurred while creating embedding : {e}")
            raise
        except Exception as e:
            logging.exception(f"An unexpected error occurred while creating embedding  : {e}")
            raise

    • When the blogs get created or the content of the blog gets updated, we will call the create_embeddings function to get text embedding and store it in our blogs index.
    # Define the index name as a global constant
    
    ELASTICSEARCH_INDEX = 'blogs'
    
    def index_blog(blog_item: BlogItem):
        try:
            es.index(index=ELASTICSEARCH_INDEX, body={
                'blog_id': blog_item.blog_id,
                'title': blog_item.title,
                'content': blog_item.content,
                'embedding': create_embeddings(blog_item.title + "n" + blog_item.content)
            })
        except Exception as e:
            logging.error(f"Failed to index blog with blog id {blog_item.blog_id} : {e}")
            raise

    • Create a Pydantic model for the request body:
    class BlogItemRequest(BaseModel):
        title: str
        content: str

    • Create an API to save blogs to Elasticsearch. The UI Service would call this API when a new blog post gets created.
    @app.post("/blogs/")
    def save_blog(response: Response, blog_item: BlogItemRequest) -> dict[str, str]:
        # Create a BlogItem instance from the request data
        try:
            blog_id = get_blog_id()
            blog_item_obj = BlogItem(
                blog_id=blog_id,
                title=blog_item.title,
                content=blog_item.content,
            )
    
            # Call the index_blog method to index the blog
            index_blog(blog_item_obj)
            return {"message": "Blog indexed successfully", "blog_id": str(blog_id)}
        except Exception as e:
            logging.error(f"An error occurred while indexing blog with blog_id {blog_id} : {e}")
            response.status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
            return {"error": "Failed to index blog"}

    Finding Relevant Blogs:

    • To find blogs that are similar to the current one, we will compare the current blog’s vector representation with other blogs present in the Elasticsearch index using the cosine similarity function.
    • Cosine similarity is a mathematical measure used to determine the cosine of the angle between two vectors in a multi-dimensional space, often employed to assess the similarity between two documents or data points. 
    • The cosine similarity score ranges from -1 to 1. As the cosine similarity score increases from -1 to 1, it indicates an increasing degree of similarity between the vectors. Higher values represent greater similarity.
    • Create a custom exception to handle a scenario when a blog for a given ID is not present in Elasticsearch.
    class BlogNotFoundException(Exception):
        def __init__(self, message="Blog not found"):
            self.message = message
            super().__init__(self.message)

    • First, we will check if the current blog is present in the blogs index and get its embedding. This is done to prevent unnecessary calls to Open AI APIs as it consumes tokens. Then, we would construct an Elasticsearch dsl query to find the nearest neighbors and return their blog content.
    def get_blog_embedding(blog_id: int) -> Optional[Dict]:
        try:
            response = es.search(index=ELASTICSEARCH_INDEX, body={
                'query': {
                    'term': {
                        'blog_id': blog_id
                    }
                },
                '_source': ['title', 'content', 'embedding']  # Fetch title, content and embedding
            })
    
            if response['hits']['hits']:
                logging.info(f"Blog found with blog_id {blog_id}")
                return response['hits']['hits'][0]['_source']
            else:
                logging.info(f"No blog found with blog_id {blog_id}")
                return None
        except Exception as e:
            logging.error(f"Error occurred while searching for blog with blog_id {blog_id}: {e}")
            raise
    
    
    def find_similar_blog(current_blog_id: int, num_neighbors=2) -> list[dict[str, str]]:
        try:
            blog_data = get_blog_embedding(current_blog_id)
            if not blog_data:
                raise BlogNotFoundException(f"Blog not found for id:{current_blog_id}")
            blog_embedding = blog_data['embedding']
            if not blog_embedding:
                blog_embedding = create_embeddings(blog_data['title'] + 'n' + blog_data['content'])
            # Find similar blogs using the embedding
            response = es.search(index=ELASTICSEARCH_INDEX, body={
                'size': num_neighbors + 1,  # Retrieve extra result as we'll exclude the current blog
                '_source': ['title', 'content', 'blog_id', '_score'],
                'query': {
                    'bool': {
                        'must': {
                            'script_score': {
                                'query': {'match_all': {}},
                                'script': {
                                    'source': "cosineSimilarity(params.query_vector, 'embedding')",
                                    'params': {'query_vector': blog_embedding}
                                }
                            }
                        },
                        'must_not': {
                            'term': {
                                'blog_id': current_blog_id  # Exclude the current blog
                            }
                        }
                    }
                }
            })
    
            # Extract and return the hits
            hits = [
                {
                    'title': hit['_source']['title'],
                    'content': hit['_source']['content'],
                    'blog_id': hit['_source']['blog_id'],
                    'score': f"{hit['_score'] * 100:.2f}%"
                }
                for hit in response['hits']['hits']
                if hit['_source']['blog_id'] != current_blog_id
            ]
    
            return hits
        except Exception as e:
            logging.error(f"An error while finding similar blogs: {e}")
            raise

    • Define a Pydantic model for the response:
    class BlogRecommendation(BaseModel):
        blog_id: int
        title: str
        content: str
        score: str

    • Create an API that would be used by UI Service to find similar blogs as the current one user is reading:
    @app.get("/recommend-blogs/{current_blog_id}")
    def recommend_blogs(
            response: Response,
            current_blog_id: int,
            num_neighbors: Optional[int] = 2) -> Union[Dict[str, str], List[BlogRecommendation]]:
        try:
            # Call the find_similar_blog function to get recommended blogs
            recommended_blogs = find_similar_blog(current_blog_id, num_neighbors)
            return recommended_blogs
        except BlogNotFoundException as e:
            response.status_code = status.HTTP_400_BAD_REQUEST
            return {"error": f"Blog not found for id:{current_blog_id}"}
        except Exception as e:
            response.status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
            return {"error": "Unable to process the request"}

    • The below flow diagram summarizes all the steps we have discussed so far:

    Testing the Recommendation Service

    • Ideally, we would be receiving the blog ID from the UI Service and passing the recommendations back, but for illustration purposes, we’ll be calling the recommend blogs API with some test inputs from my test dataset. The blogs in this sample dataset have concise titles and content, which are sufficient for testing purposes, but real-world blogs will be much more detailed and have a significant amount of data. The test dataset has around 1000 blogs on various categories like healthcare, tech, travel, entertainment, and so on.
    • A sample from the test dataset:

     

    • Test Result 1: Medical Research Blog

      Input Blog: Blog_Id: 1, Title: Breakthrough in Heart Disease Treatment, Content: Researchers have developed a new treatment for heart disease that promises to be more effective and less invasive. This breakthrough could save millions of lives every year.


    • Test Result 2: Travel Blog

      Input Blog: Blog_Id: 4, Title: Travel Tips for Sustainable Tourism, Content: How to travel responsibly and sustainably.


    I manually tested multiple blogs from the test dataset of 1,000 blogs, representing distinct topics and content, and assessed the quality and relevance of the recommendations. The recommended blogs had scores in the range of 87% to 95%, and upon examination, the blogs often appeared very similar in content and style.

    Based on the test results, it’s evident that utilizing vector search enables us to effectively recommend blogs to users that are semantically similar. This approach ensures that the recommendations are contextually relevant, even when the blogs don’t share identical keywords, enhancing the user’s experience by connecting them with content that aligns more closely with their interests and search intent.

    Limitations

    This approach for finding similar blogs is good enough for our simple recommendation service, but it might have certain limitations in real-world applications.

    • Our similarity search returns the nearest k neighbors as recommendations, but there might be scenarios where no similar blog might exist or the neighbors might have significant score differences. To deal with this, you can set a threshold to filter out recommendations below a certain score. Experiment with different threshold values and observe their impact on recommendation quality. 
    • If your use case involves a small dataset and the relationships between user preferences and item features are straightforward and well-defined, traditional methods like content-based or collaborative filtering might be more efficient and effective than vector search.

    Further Improvements

    • Using LLM for Content Validation: Implement a verification step using large language models (LLMs) to assess the relevance and validity of recommended content. This approach can ensure that the suggestions are not only similar in context but also meaningful and appropriate for your audience.
    • Metadata-based Embeddings: Instead of generating embeddings from the entire blog content, utilize LLMs to extract key metadata such as themes, intent, tone, or key points. Create embeddings based on this extracted metadata, which can lead to more efficient and targeted recommendations, focusing on the core essence of the content rather than its entirety.

    Conclusion

    Our journey concludes here, but yours is just beginning. Armed with the knowledge of vector search, vector databases, and embeddings, you’re now ready to build a recommendation service that doesn’t just guide users to content but connects them to the stories, insights, and experiences they seek. It’s not just about building a service; it’s about enriching the digital exploration experience, one recommendation at a time.

  • Elasticsearch – Basic and Advanced Concepts

    What is Elasticsearch?

    In our previous blog, we have seen Elasticsearch is a highly scalable open-source full-text search and analytics engine, built on the top of Apache Lucene. Elasticsearch allows you to store, search, and analyze huge volumes of data as quickly as possible and in near real-time.

    Basic Concepts –

    • Index – Large collection of JSON documents. Can be compared to a database in relational databases. Every document must reside in an index.
    • Shards – Since, there is no limit on the number of documents that reside in an index, indices are often horizontally partitioned as shards that reside on nodes in the cluster. 
      Max documents allowed in a shard = 2,147,483,519 (as of now)
    • Type – Logical partition of an index. Similar to a table in relational databases. 
    • Fields – Similar to a column in relational databases. 
    • Analyzers – Used while indexing/searching the documents. These contain “tokenizers” that split phrases/text into tokens and “token-filters”, that filter/modify tokens during indexing & searching.
    • Mappings – Combination of Field + Analyzers. It defines how your fields can be stored & indexed.

    Inverted Index

    ES uses Inverted Indexes under the hood. Inverted Index is an index which maps terms to documents containing them.

    Let’s say, we have 3 documents :

    1. Food is great
    2. It is raining
    3. Wind is strong

    An inverted index for these documents can be constructed as –

    The terms in the dictionary are stored in a sorted order to find them quickly.

    Searching multiple terms is done by performing a lookup on the terms in the index. It performs either UNION or INTERSECTION on them and fetches relevant matching documents.

    An ES Index is spanned across multiple shards, each document is routed to a shard in a round–robin fashion while indexing. We can customize which shard to route the document, and which shard search-requests are sent to.

    ES Index is made of multiple Lucene indexes, which in turn, are made up of index segments. These are write once, read many types of indices, i.e the index files Lucene writes are immutable (except for deletions).

    Analyzers –

    Analysis is the process of converting text into tokens or terms which are added to the inverted index for searching. Analysis is performed by an analyzer. An analyzer can be either a built-in or a custom. 

    We can define single analyzer for both indexing & searching, or a different search-analyzer and an index-analyzer for a mapping.

    Building blocks of analyzer- 

    • Character filters – receives the original text as a stream of characters and can transform the stream by adding, removing, or changing characters.
    • Tokenizers – receives a stream of characters, breaks it up into individual tokens. 
    • Token filters – receives the token stream and may add, remove, or change tokens.

    Some Commonly used built-in analyzers –

    1. Standard –

    Divides text into terms on word boundaries. Lower-cases all terms. Removes punctuation and stopwords (if specified, default = None).

    Text:  The 2 QUICK Brown-Foxes jumped over the lazy dog’s bone.

    Output: [the, 2, quick, brown, foxes, jumped, over, the, lazy, dog’s, bone]

    2. Simple/Lowercase –

    Divides text into terms whenever it encounters a non-letter character. Lower-cases all terms.

    Text: The 2 QUICK Brown-Foxes jumped over the lazy dog’s bone.

    Output: [ the, quick, brown, foxes, jumped, over, the, lazy, dog, s, bone ]

    3. Whitespace –

    Divides text into terms whenever it encounters a white-space character.

    Text: The 2 QUICK Brown-Foxes jumped over the lazy dog’s bone.

    Output: [ The, 2, QUICK, Brown-Foxes, jumped, over, the, lazy, dog’s, bone.]

    4. Stopword –

    Same as simple-analyzer with stop word removal by default.

    Text: The 2 QUICK Brown-Foxes jumped over the lazy dog’s bone.

    Output: [ quick, brown, foxes, jumped, over, lazy, dog, s, bone]

    5. Keyword / NOOP –

    Returns the entire input string as it is.

    Text: The 2 QUICK Brown-Foxes jumped over the lazy dog’s bone.

    Output: [The 2 QUICK Brown-Foxes jumped over the lazy dog’s bone.]

    Some Commonly used built-in tokenizers –

    1. Standard –

    Divides text into terms on word boundaries, removes most punctuation.

    2. Letter –

    Divides text into terms whenever it encounters a non-letter character.

    3. Lowercase –

    Letter tokenizer which lowercases all tokens.

    4. Whitespace –

    Divides text into terms whenever it encounters any white-space character.

    5. UAX-URL-EMAIL –

    Standard tokenizer which recognizes URLs and email addresses as single tokens.

    6. N-Gram –

    Divides text into terms when it encounters anything from a list of specified characters (e.g. whitespace or punctuation), and returns n-grams of each word: a sliding window of continuous letters, e.g. quick → [qu, ui, ic, ck, qui, quic, quick, uic, uick, ick].

    7. Edge-N-Gram –

    It is similar to N-Gram tokenizer with n-grams anchored to the start of the word (prefix- based NGrams). e.g. quick → [q, qu, qui, quic, quick].

    8. Keyword –

    Emits exact same text as a single term.

    Make your mappings right –

    Analyzers if not made right, can increase your search time extensively. 

    Avoid using regular expressions in queries as much as possible. Let your analyzers handle them.

    ES provides multiple tokenizers (standard, whitespace, ngram, edge-ngram, etc) which can be directly used, or you can create your own tokenizer. 

    A simple use-case where we had to search for a user who either has “brad” in their name or “brad_pitt” in their email (substring based search), one would simply go and write a regex for this query, if no proper analyzers are written for this mapping.

    {
      "query": {
        "bool": {
          "should": [
            {
              "regexp": {
                "email.raw": ".*brad_pitt.*"
              }
            },
            {
              "regexp": {
                "name.raw": ".*brad.*"
              }
            }
          ]
        }
      }
    }

    This took 16s for us to fetch 1 lakh out of 60 million documents

    Instead, we created an n-gram analyzer with lower-case filter which would generate all relevant tokens while indexing.

    The above regex query was updated to –

    {
      "query": {
        "bool": {
          "multi_match": {
            "query": "brad",
            "fields": [
              "email.suggestion",
              "full_name.suggestion"
            ]
          }
        }
      },
      "size": 25
    }

    This took 109ms for us to fetch 1 lakh out of 60 million documents

    Thus, previous search query which took more than 10-25s got reduced to less than 800-900ms to fetch the same set of records.

    Had the use-case been to search results where name starts with “brad” or email starts with “brad_pitt” (prefix based search), it is better to go for edge-n-gram analyzer or suggesters.

    Performance Improvement with Filter Queries –

    Use Filter queries whenever possible. 

    ES usually scores documents and returns them in sorted order as per their scores. This may take a hit on performance if scoring of documents is not relevant to our use-case. In such scenarios, use “filter” queries which give boolean scores to documents.

    {
      "query": {
        "bool": {
          "multi_match": {
            "query": "brad",
            "fields": [
              "email.suggestion",
              "full_name.suggestion"
            ]
          }
        }
      },
      "size": 25
    }

    Above query can now be written as –

    {
      "query": {
        "bool": {
          "filter": {
            "bool": {
              "must": [
                {
                  "multi_match": {
                    "query": "brad",
                    "fields": [
                      "email.suggestion",
                      "full_name.suggestion"
                    ]
                  }
                }
              ]
            }
          }
        }
      },
      "size": 25
    }

    This will reduce query-time by a few milliseconds.

    Re-indexing made faster –

    Before creating any mappings, know your use-case well.

    ES does not allow us to alter existing mappings unlike “ALTER” command in relational databases, although we can keep adding new mappings to the index. 

    The only way to change existing mappings is by creating a new index, re-indexing existing documents and aliasing the new-index with required name with ZERO downtime on production. Note – This process can take days if you have millions of records to re-index.

    To re-index faster, we can change a few settings  –

    1. Disable swapping – Since no requests will be directed to the new index till indexing is done, we can safely disable swap
    Command for Linux machines –

    sudo swapoff -a

    2. Disable refresh_interval for ES – Default refresh_interval is 1s which can safely be disabled while documents are getting re-indexed.

    3. Change bulk size while indexing – ES usually indexes documents in chunks of size 1k. It is preferred to increase this default size to approx 5 to 10K, although we need to find the sweet spot while reindexing to avoid load on current index.

    4. Reset replica count to 0  – ES creates at least 1 replica per shard, by default. We can set this to 0 while indexing & reset it to required value post indexing.

    Conclusion

    ElasticSearch is a very powerful database for text-based searches. The Elastic ecosystem is widely used for reporting, alerting, machine learning, etc. This article just gives an overview of ElasticSearch mappings and how creating relevant mappings can improve your query performance & accuracy. Giving right mappings, right resources to your ElasticSearch cluster can do wonders.

  • Improving Elasticsearch Indexing in the Rails Model using Searchkick

    Searching has become a prominent feature of any web application, and a relevant search feature requires a robust search engine. The search engine should be capable of performing a full-text search, auto completion, providing suggestions, spelling corrections, fuzzy search, and analytics. 

    Elasticsearch, a distributed, fast, and scalable search and analytic engine, takes care of all these basic search requirements.

    The focus of this post is using a few approaches with Elasticsearch in our Rails application to reduce time latency for web requests. Let’s review one of the best ways to improve the Elasticsearch indexing in Rails models by moving them to background jobs.

    In a Rails application, Elasticsearch can be integrated with any of the following popular gems:

    We can continue with any of these gems mentioned above. But for this post, we will be moving forward with the Searchkick gem, which is a much more Rails-friendly gem.

    The default Searchkick gem option uses the object callbacks to sync the data in the respective Elasticsearch index. Being in the callbacks, it costs the request, which has the creation and updation of a resource to take additional time to process the web request.

    The below image shows logs from a Rails application, captured for an update request of a user record. We have added a print statement before Elasticsearch tries to sync in the Rails model so that it helps identify from the logs where the indexing has started. These logs show that the last two queries were executed for indexing the data in the Elasticsearch index.

    Since the Elasticsearch sync is happening while updating a user record, we can conclude that the user update request will take additional time to cover up the Elasticsearch sync.

    Below is the request flow diagram:

    From the request flow diagram, we can say that the end-user must wait for step 3 and 4 to be completed. Step 3 is to fetch the children object details from the database.

    To tackle the problem, we can move the Elasticsearch indexing to the background jobs. Usually, for Rails apps in production, there are separate app servers, database servers, background job processing servers, and Elasticsearch servers (in this scenario).

    This is how the request flow looks when we move Elasticsearch indexing:

    Let’s get to coding!

    For demo purposes, we will have a Rails app with models: `User` and `Blogpost`. The stack used here:

    • Rails 5.2
    • Elasticsearch 6.6.7
    • MySQL 5.6
    • Searchkick (gem for writing Elasticsearch queries in Ruby)
    • Sidekiq (gem for background processing)

    This approach does not require  any specific version of Rails, Elasticsearch or Mysql. Moreover, this approach is database agnostic. You can go through the code from this Github repo for reference.

    Let’s take a look at the user model with Elasticsearch index.

    # == Schema Information
    #
    # Table name: users
    #
    #  id            :bigint           not null, primary key
    #  name          :string(255)
    #  email         :string(255)
    #  mobile_number :string(255)
    #  created_at    :datetime         not null
    #  updated_at    :datetime         not null
    #
    class User < ApplicationRecord
     searchkick
    
     has_many :blogposts
     def search_data
       {
         name: name,
         email: email,
         total_blogposts: blogposts.count,
         last_published_blogpost_date: last_published_blogpost_date
       }
     end
     ...
    end

    Anytime a user object is inserted, updated, or deleted, Searchkick reindexes the data in the Elasticsearch user index synchronously.

    Searchkick already provides four ways to sync Elasticsearch index:

    • Inline (default)
    • Asynchronous
    • Queuing
    • Manual

    For more detailed information on this, refer to this page. In this post, we are looking in the manual approach to reindex the model data.

    To manually reindex, the user model will look like:

    class User < ApplicationRecord
     searchkick callbacks: false
    
     def search_data
       ...
     end
    end

    Now, we will need to define a callback that can sync the data to the Elasticsearch index. Typically, this callback must be written in all the models that have the Elasticsearch index. Instead, we can write a common concern and include it to required models.

    Here is what our concern will look like:

    module ElasticsearchIndexer
     extend ActiveSupport::Concern
    
     included do
       after_commit :reindex_model
       def reindex_model
         ElasticsearchWorker.perform_async(self.id, self.class.name)
       end
     end
    end

    In the above active support concern, we have called the Sidekiq worker named ElasticsearchWorker. After adding this concern, don’t forget to include the Elasticsearch indexer concern in the user model, like so:

    include ElasticsearchIndexer

    Now, let’s see the Elasticsearch Sidekiq worker:

    class ElasticsearchWorker
     include Sidekiq::Worker
     def perform(id, klass)
       begin
         klass.constantize.find(id.to_s).reindex
       rescue => e
         # Handle exception
       end
     end
    end

    That’s it, we’ve done it. Cool, huh? Now, whenever a user creates, updates, or deletes web request, a background job will be created. The background job can be seen in the Sidekiq web UI at localhost:3000/sidekiq

    Now, there is little problem in the Elasticsearch indexer concern. To reproduce this, go to your user edit page, click save, and look at localhost:3000/sidekiq—a job will be queued.

    We can handle this case by tracking the dirty attributes. 

    module ElasticsearchIndexer
     extend ActiveSupport::Concern
     included do
       after_commit :reindex_model
       def reindex_model
         return if self.previous_changes.keys.blank?
         ElasticsearchWorker.perform_async(self.id, klass)
       end
     end
    end

    Furthermore, there are few more areas of improvement. Suppose you are trying to update the field of user model that is not part of the Elasticsearch index, the Elasticsearch worker Sidekiq job will still get created and reindex the associated model object. This can be modified to create the Elasticsearch indexing worker Sidekiq job only if the Elasticsearch index fields are updated.

    module ElasticsearchIndexer
     extend ActiveSupport::Concern
     included do
       after_commit :reindex_model
       def reindex_model
         updated_fields = self.previous_changes.keys
        
         # For getting ES Index fields you can also maintain constant
         # on model level or get from the search_data method.
         es_index_fields = self.search_data.stringify_keys.keys
         return if (updated_fields & es_index_fields).blank?
         ElasticsearchWorker.perform_async(self.id, klass)
       end
     end
    end

    Conclusion

    Moving the Elasticsearch indexing to background jobs is a great way to boost the performance of the web app by reducing the response time of any web request. Implementing this approach for every model would not be ideal. I would recommend this approach only if the Elasticsearch index data are not needed in real-time.

    Since the execution of background jobs depends on the number of jobs it must perform, it might take time to reflect the changes in the Elasticsearch index if there are lots of jobs queued up. To solve this problem to some extent, the Elasticsearch indexing jobs can be added in a queue with high priority. Also, make sure you have a different app server and background job processing server. This approach works best if the app server is different than the background job processing server.

  • Monitoring a Docker Container with Elasticsearch, Kibana, and Metricbeat

    Since you are on this page, you have probably already started using Docker to deploy your applications and are enjoying it compared to virtual machines, because of it being lightweight, easy to deploy and its exceptional security management features.

    And, once the applications are deployed, monitoring your containers and tracking their activities in real time is very essential. Imagine a scenario where you are managing one or many virtual machines. Your pre-configured session will be doing everything, including monitoring. If you face any problems during production, then—with a handful of commands such as top, htop, iotop, and with flags like -o, %CPU, and %MEM—you are good to troubleshoot the issue.

    On the other hand, consider a scenario where you have the same nodes spread across 100-200 containers. You will need to see all activity in one place to query for information about what happened. Here, monitoring comes into the picture. We will be discussing more benefits as we move further.

    This blog will cover Docker monitoring with Elasticsearch, Kibana, and Metricbeat. Basically, Elasticsearch is a platform that allows us to have distributed search and analysis of data in real-time along with visualization. We’ll be discussing how all these work interdependently as we move ahead. Like Elasticsearch, Kibana is also open-source software. Kibana is an interface mainly used to visualize the data sent from Elasticsearch. Metricbeat is a lightweight shipper of collected metrics from your system to the desired target (Elasticsearch in this case). 

    What is Docker Monitoring?

    In simple terms, monitoring containers is how we keep track of the above metrics and analyze them to ensure the performance of applications built on microservices and to keep track of issues so that they can be solved more easily. This monitoring is vital for performance improvement and optimization and to find the RCA of various issues.

    There is a lot of software available for monitoring the Docker container, both open-source as well as proprietary, like Prometheus, AppOptics, Metricbeats, Datadog, Sumologic, etc.

    You can choose any of these based on convenience. 

    Why is Docker Monitoring needed?

    1. Monitoring helps early detection and to fix issues to avoid a breakdown during production
    2. New feature additions/updates implemented safely as the entire application is monitored
    3. Docker monitoring is beneficial for developers, IT pros, and enterprises as well.
    • For developers, Docker monitoring tracks bugs and helps to resolve them quickly along with enhancing security.
    • For IT pros, it helps with flexible integration of existing processes and enterprise systems and satisfies all the requirements.
    • For enterprises, it helps to build the application within a certified container within a secured ecosystem that runs smoothly. 

    Elasticsearch is a platform that allows us to have distributed search and analysis of data in real-time, along with visualization. Elasticsearch is free and open-source software. It goes well with a huge number of technologies, like Metricbeat, Kibana, etc. Let’s move onto the installation of Elasticsearch.

    Installation of Elasticsearch:

    Prerequisite: Elasticsearch is built in Java. So, make sure that your system at least has Java8 to run Elasticsearch.

    For installing Elasticsearch for your OS, please follow the steps at Installing Elasticsearch | Elasticsearch Reference [7.11].

    After installing,  check the status of Elasticsearch by sending an HTTP request on port 9200 on localhost.

    http://localhost:9200/

    This will give you a response as below:

    You can configure Elasticsearch by editing $ES_HOME/config/elasticsearch.yml 

    Learn more about configuring Elasticsearch here.

    Now, we are done with the Elasticsearch setup and are ready to move onto Kibana.

    Kibana:

    Like Elasticsearch, Kibana is also open-source software. Kibana is an interface mainly used to visualize the data from Elasticsearch. Kibana allows you to do anything via query and let’s you generate numerous visuals as per your requirements. Kibana lets you visualize enormous amounts of data in terms of line graphs, gauges, and all other graphs.

    Let’s cover the installation steps of Kibana.

    Installing Kibana

    Prerequisites: 

    • Must have Java1.8+ installed 
    • Elasticsearch v1.4.4+
    • Web browser such as Chrome, Firefox

    For installing Kibana with respect to your OS, please follow the steps at Install Kibana | Kibana Guide [7.11]

    Kibana runs on default port number 5601. Just send an HTTP request to port 5601 on localhost with http://localhost:5601/ 

    You should land on the Kibana dashboard, and it is now ready to use:

    You can configure Kibana by editing $KIBANA_HOME/config. For more about configuring Kibana, visit here.

    Let’s move onto the final part—setting up with Metricbeat.

    Metricbeat

    Metricbeat sends metrics frequently, and we can say it’s a lightweight shipper of collected metrics from your system.

    You can simply install Metricbeat to your system or servers to periodically collect metrics from the OS and the microservices running on services. The collected metrics are shipped to the output you specified, e.g., Elasticsearch, Logstash. 

    Installing Metricbeat

    For installing Metricbeat according to your OS, follow the steps at Install Kibana | Kibana Guide [7.11]

    As soon as we start the Metricbeat service, it sends Docker metrics to the Elasticsearch index, which can be confirmed by curling Elasticsearch indexes with the command:

    curl -XGET 'localhost:9200/_cat/indices?v&pretty'

    How Are They Internally Connected?

    We have now installed all three and they are up and running. As per the period mentioned, docker.yml will hit the Docker API and send the Docker metrics to Elasticsearch. Those metrics are now available in different indexes of Elasticsearch. As mentioned earlier, Kibana queries the data of Elasticsearch and visualizes it in the form of graphs. In this, all three are connected. 

    Please refer to the flow chart for more clarification:

    How to Create Dashboards?

    Now that we are aware of how these three tools work interdependently, let’s create dashboards to monitor our containers and understand those. 

    First of all, open the Dashboards section on Kibana (localhost:5601/) and click the Create dashboard button:

     

    You will be directed to the next page:

    Choose the type of visualization you want from all options:

    For example, let’s go with Lens

    (Learn more about Kibana Lens)

    Here, we will be looking for the number of containers vs. timestamps by selecting the timestamp on X-axis and the unique count of docker.container.created on Y-axis.

    As soon we have selected both parameters, it will generate a graph as shown in the snapshot, and we will be getting the count of created containers (here Count=1). If you create move containers on your system, when that data metric is sent to Kibana, the graph and the counter will be modified. In this way, you can monitor how many containers are created over time. In similar fashion, depending on your monitoring needs, you can choose a parameter from the left panel showing available fields like: 

    activemq.broker.connections.count

    docker.container.status

    Docker.container.tags

    Now, we will show one more example of how to create a bar graph:

    As mentioned above, to create a bar graph just choose “vertical bar” from the above snapshot. Here, I’m trying to get a bar graph for the count of documents vs. metricset names, such as network, file system, cpu, etc. So, as shown in the snapshot on the left, choose the Y-axis parameter as count and X-axis parameter as metricset.name as shown in the right side of the snapshot

    After hitting enter, a graph will be generated: 

    Similarly, you can try it out with multiple parameters with different types of graphs to monitor. Now, we will move onto the most important and widely used monitoring tool to track warnings, errors, etc., which is DISCOVER.

    Discover for Monitoring:

    Basically, Discover provides deep insights into data, showing you where you can apply searches and filters as well. With it, you can show which processes are taking more time and show only those. Filter out errors occurring with the message filter with a value of ERROR. Check the health of the container; check for logged-in users. These kinds of queries can be sent and the desired results can be achieved, leading to good monitoring of containers, same as the SQL queries. 

    [More about Discover here.]

    To apply filters, just click on the “filter by type” from the left panel, and you will see all available filtering options. From there, you can select one as per your requirements, and view those on the central panel. 

    Similar to filter, you can choose fields to be shown on the dashboard from the left panel with “Selected fields” right below the filters. (Here, we have only selected info for Source.)

    Now, if you take a look at the top part of the snapshot, you will find the search bar. This is the most useful part of Discover for monitoring.

    In that bar, you just need to put a query, and according to that query, logs will be filtered. For example, I will be putting a query for error messages equal to No memory stats data available.

    When we hit the update button on the right side, only logs containing that error message will be there and highlighted for differentiation, as shown in the snapshot. All other logs will not be shown. In this way, you can track a particular error and ensure that it does not exist after fixing it.

    In addition to query, it also provides keyword search. So, if you input a word like warning, error, memory, or user, then it will provide logs for that word, like “memory” in the snapshot:

     

    Similar to Kibana, we also receive logs in the terminal. For example, the following highlighted portion is about the state of your cluster. In the terminal, you can put a simple grep command for required logs. 

    With this, you can monitor Docker containers with multiple queries, such as nested queries for the Discover facility. There are many different graphs you can try depending on your requirements to keep your application running smoothly.

    Conclusion

    Monitoring requires a lot of time and effort. What we have seen here is a drop in the ocean. For some next steps, try:

    1. Monitoring network
    2. Aggregating logs from your different applications
    3. Aggregating logs from multiple containers
    4. Alerts setting and monitoring
    5. Nested queries for logs