Skip to content

How to display AWS CloudWatch logs in Streamlit

Let's dive in the following scenario:

  • we have some job/task running on AWS
  • we have already built a Streamlit frontend to launch jobs
  • we want to monitor AWS CloudWatch logs generated by the job execution
  • we don't want to neither switch from our Streamlit frontend to AWS Console, nor become crazy in following right log groups/streams to track our job

A possible custom solution is presented below.

Requirements

streamlit        # to build monitor frontend
boto3            # to interact with CloudWatch
pandas           # to manipulate logs as dataframes
pytz             # to set local timezones to logs timestamp
streamlit-aggrid # to enhance logs viz experience

Source code

The solution consists in a Streamlit monitor which tracks last job execution (a custom widget to filter execution by date should be a straightforward addition).

import os
from datetime import datetime
from typing import Optional

import boto3
import pandas as pd
import pytz
import streamlit as st
from st_aggrid import AgGrid, GridOptionsBuilder, JsCode


def gmt2rome(date: str, fmt: str = "%a, %d %b %Y %H:%M:%S GMT") -> datetime:
    """Convert GMT timestamp to EU/Rome.

    Args:
        date (str): [description]
        fmt (str, optional): [description]. Defaults to "%a, %d %b %Y %H:%M:%S GMT".

    Returns:
        EU/Rome datetime
    """
    gmt = pytz.timezone("GMT")
    return gmt.localize(datetime.strptime(date, fmt)).astimezone(pytz.timezone('Europe/Rome'))


def get_last_log(cloudwatch: boto3.client, log_group: str) -> pd.DataFrame:
    """Build most recent logs dataframe for the given AWS resource.

    # SEE: https://gist.github.com/eldondevcg/fffff4b7909351b19a53 for credits.

    Args:
        cloudwatch (boto3.client): cloudwatch client.
        log_group (str): log group name.

    Returns:
        Most recent logs dataframe.
    """
    # Initialize stream batch
    stream_batch = cloudwatch.describe_log_streams(
                logGroupName=log_group, orderBy='LastEventTime')

    all_streams = []
    # Retrieve all streams
    all_streams += stream_batch['logStreams']
    while 'nextToken' in stream_batch:
        stream_batch = cloudwatch.describe_log_streams(
            logGroupName=log_group, nextToken=stream_batch['nextToken'], orderBy='LastEventTime')
        all_streams += stream_batch['logStreams']

    if all_streams:
        # Select only the last stream
        stream = all_streams[-1]
        st.info(f"**Log stream name**: `{stream['logStreamName']}`")
        # Get log events
        events = []
        logs_batch = cloudwatch.get_log_events(
            logGroupName=log_group, logStreamName=stream['logStreamName'])
        for event in logs_batch['events']:
            events.append(event)
        while 'nextToken' in logs_batch:
            logs_batch = cloudwatch.get_log_events(
                logGroupName=log_group, logStreamName=stream['logStreamName'], nextToken=logs_batch['nextToken'])
            for event in logs_batch['events']:
                events.append(event)

        # Return dataframe with log timestamp and message
        df = (
            pd.DataFrame(
                map(
                    lambda x: dict((k, v) for k, v in x.items()
                                if k in ['timestamp', 'message']),
                    events
                )
            )        
        )
        df['timestamp'] = df['timestamp'].apply(lambda x: gmt2rome(str(datetime.utcfromtimestamp(x/1000.)).split('.')[0], fmt='%Y-%m-%d %H:%M:%S'))
        return df        


def get_log_groups(cloudwatch: boto3.client, prefix: str, env: Optional[str]) -> list:
    """Retrieve log groups given prefix.

    Args:
        cloudwatch (boto3.client): cloudwatch client.
        prefix (str): log group prefix.
        env (Optional[str]): AWS environment.

    Returns:
        Matching log groups.
    """
    group_batch = cloudwatch.describe_log_groups(logGroupNamePrefix=prefix)
    all_groups = []
    all_groups += group_batch['logGroups']
    while 'nextToken' in group_batch:
        group_batch = cloudwatch.describe_log_groups(
            logGroupNamePrefix=prefix, nextToken=group_batch['nextToken'])
        all_groups += group_batch['logGroups']
    if env:
        return list(filter(lambda x: f'-{env}' in x, map(lambda x: x['logGroupName'], all_groups)))
    else:
        return list(map(lambda x: x['logGroupName'], all_groups))


def _configure_aggrid(df: pd.DataFrame) -> GridOptionsBuilder:
    """Configure AgGrid options.

    Args:
        df (pd.DataFrame): dataframe to display.

    Returns:
        Configured options builder.
    """
    # Builder initialization
    gb = GridOptionsBuilder.from_dataframe(df)
    # Streamlit dark theme options
    gb.configure_grid_options(
        rowStyle={'color': '#FAFAFA', 'background': '#0E1117'})
    # (Optional) custom js code to inject to conditionally highlight log records
    # FIXME: as of now, unable to inject string to match via f-string
    # due to AgGrid custom component error... defaults to 'ERROR'
    highlight_style_jscode = JsCode('''
    function(params) {
        if (params.value.includes('ERROR')) {
            return {
                'color': 'white',
                'backgroundColor': '#f63366'
            }
        } else {
            return {
                'color': 'white',
                'backgroundColor': '#0E1117'
            }
        }
    };
    ''')
    gb.configure_column("message", cellStyle=highlight_style_jscode)
    return gb


def display_logs(df: pd.DataFrame) -> AgGrid:
    """Display logs via AgGrid.

    Args:
        df (pd.DataFrame): logs dataframe.

    Returns:
        AgGrid table.
    """
    return AgGrid(
        df,
        gridOptions=_configure_aggrid(df).build(),
        allow_unsafe_jscode=True
    )


def load_sidebar():
    with st.sidebar:
        st.markdown('# ☁️ AWS CloudWatch Logs')
        profile = st.text_input('AWS named profile', value='')
        try:
            session = boto3.session.Session(
                profile_name=profile,
                region_name=os.environ.get('AWS_REGION', 'eu-central-1')
                )
            env = profile.split('-')[-1]
            return env, session.client('logs')
        except Exception as e:
            st.error(e)
            return None, None


def main():
    env, cloudwatch = load_sidebar()
    if env and cloudwatch:
        cols = st.columns(2)
        with cols[0]:
            resource_type = st.text_input(
                'Resource type', 
                help='AWS resource type as per log groups naming (e.g. `lambda`, `codebuild`, `ecs`, ...)'
                )
        with cols[1]:
            resource_prefix = st.text_input(
                'Common prefix',
                help='Possible common prefix to resources (e.g. use case name)'
                )
        with cols[0]:
            env = env if st.checkbox(f'Filter log groups containing -{env}') else None

        if resource_type:
            prefix = f'/aws/{resource_type}/'
            if resource_prefix:
                prefix += f'{resource_prefix}-'
            matching_groups = get_log_groups(cloudwatch, prefix, env)
            if matching_groups:
                format_group = lambda x: x.replace(prefix, '').replace(f'-{env}', '')
                log_group = st.selectbox(
                    'Matching resources', 
                    options=matching_groups, 
                    format_func=format_group)

                # Log display
                last_logs = get_last_log(cloudwatch, log_group)
                if last_logs is not None:
                    st.markdown(f'## Most recent log for `{format_group(log_group)}`')
                    display_logs(last_logs)
                else:
                    st.error(f"No streams for log group `{log_group}`")
            else:
                st.error('No matching log groups for given resource type and prefix.')


if __name__ == "__main__":
    st.set_page_config(page_title='AWS CloudWatch logs',
                       page_icon='☁️', layout='wide')
    main()

Structure

Given a virtual environment with the above requirements already installed and the AWS CLI installed and configured with (at least) a named profile, you should be good to go.

Usage:

  1. in the sidebar you can insert the named profile name as per ~/.aws/credentials
  2. in the Resource type text input you can insert the "type" of resource whose logs you want to retrieve (e.g. lambda, ecs, ...)
  3. you can further filter logs by request a match with a common prefix
  4. matching resources will be available in the subsequent selectbox
  5. selected resource logs will be shown through AgGrid, with (possible) errors line highlighted by default