How to display AWS CloudWatch logs in Streamlit¶
Let's dive in the following scenario:
- we have some job/task running on AWS
- we have already built a Streamlit frontend to launch jobs
- we want to monitor AWS CloudWatch logs generated by the job execution
- we don't want to neither switch from our Streamlit frontend to AWS Console, nor become crazy in following right log groups/streams to track our job
A possible custom solution is presented below.
Requirements¶
streamlit # to build monitor frontend
boto3 # to interact with CloudWatch
pandas # to manipulate logs as dataframes
pytz # to set local timezones to logs timestamp
streamlit-aggrid # to enhance logs viz experience
Source code¶
The solution consists in a Streamlit monitor which tracks last job execution (a custom widget to filter execution by date should be a straightforward addition).
import os
from datetime import datetime
from typing import Optional
import boto3
import pandas as pd
import pytz
import streamlit as st
from st_aggrid import AgGrid, GridOptionsBuilder, JsCode
def gmt2rome(date: str, fmt: str = "%a, %d %b %Y %H:%M:%S GMT") -> datetime:
"""Convert GMT timestamp to EU/Rome.
Args:
date (str): [description]
fmt (str, optional): [description]. Defaults to "%a, %d %b %Y %H:%M:%S GMT".
Returns:
EU/Rome datetime
"""
gmt = pytz.timezone("GMT")
return gmt.localize(datetime.strptime(date, fmt)).astimezone(pytz.timezone('Europe/Rome'))
def get_last_log(cloudwatch: boto3.client, log_group: str) -> pd.DataFrame:
"""Build most recent logs dataframe for the given AWS resource.
# SEE: https://gist.github.com/eldondevcg/fffff4b7909351b19a53 for credits.
Args:
cloudwatch (boto3.client): cloudwatch client.
log_group (str): log group name.
Returns:
Most recent logs dataframe.
"""
# Initialize stream batch
stream_batch = cloudwatch.describe_log_streams(
logGroupName=log_group, orderBy='LastEventTime')
all_streams = []
# Retrieve all streams
all_streams += stream_batch['logStreams']
while 'nextToken' in stream_batch:
stream_batch = cloudwatch.describe_log_streams(
logGroupName=log_group, nextToken=stream_batch['nextToken'], orderBy='LastEventTime')
all_streams += stream_batch['logStreams']
if all_streams:
# Select only the last stream
stream = all_streams[-1]
st.info(f"**Log stream name**: `{stream['logStreamName']}`")
# Get log events
events = []
logs_batch = cloudwatch.get_log_events(
logGroupName=log_group, logStreamName=stream['logStreamName'])
for event in logs_batch['events']:
events.append(event)
while 'nextToken' in logs_batch:
logs_batch = cloudwatch.get_log_events(
logGroupName=log_group, logStreamName=stream['logStreamName'], nextToken=logs_batch['nextToken'])
for event in logs_batch['events']:
events.append(event)
# Return dataframe with log timestamp and message
df = (
pd.DataFrame(
map(
lambda x: dict((k, v) for k, v in x.items()
if k in ['timestamp', 'message']),
events
)
)
)
df['timestamp'] = df['timestamp'].apply(lambda x: gmt2rome(str(datetime.utcfromtimestamp(x/1000.)).split('.')[0], fmt='%Y-%m-%d %H:%M:%S'))
return df
def get_log_groups(cloudwatch: boto3.client, prefix: str, env: Optional[str]) -> list:
"""Retrieve log groups given prefix.
Args:
cloudwatch (boto3.client): cloudwatch client.
prefix (str): log group prefix.
env (Optional[str]): AWS environment.
Returns:
Matching log groups.
"""
group_batch = cloudwatch.describe_log_groups(logGroupNamePrefix=prefix)
all_groups = []
all_groups += group_batch['logGroups']
while 'nextToken' in group_batch:
group_batch = cloudwatch.describe_log_groups(
logGroupNamePrefix=prefix, nextToken=group_batch['nextToken'])
all_groups += group_batch['logGroups']
if env:
return list(filter(lambda x: f'-{env}' in x, map(lambda x: x['logGroupName'], all_groups)))
else:
return list(map(lambda x: x['logGroupName'], all_groups))
def _configure_aggrid(df: pd.DataFrame) -> GridOptionsBuilder:
"""Configure AgGrid options.
Args:
df (pd.DataFrame): dataframe to display.
Returns:
Configured options builder.
"""
# Builder initialization
gb = GridOptionsBuilder.from_dataframe(df)
# Streamlit dark theme options
gb.configure_grid_options(
rowStyle={'color': '#FAFAFA', 'background': '#0E1117'})
# (Optional) custom js code to inject to conditionally highlight log records
# FIXME: as of now, unable to inject string to match via f-string
# due to AgGrid custom component error... defaults to 'ERROR'
highlight_style_jscode = JsCode('''
function(params) {
if (params.value.includes('ERROR')) {
return {
'color': 'white',
'backgroundColor': '#f63366'
}
} else {
return {
'color': 'white',
'backgroundColor': '#0E1117'
}
}
};
''')
gb.configure_column("message", cellStyle=highlight_style_jscode)
return gb
def display_logs(df: pd.DataFrame) -> AgGrid:
"""Display logs via AgGrid.
Args:
df (pd.DataFrame): logs dataframe.
Returns:
AgGrid table.
"""
return AgGrid(
df,
gridOptions=_configure_aggrid(df).build(),
allow_unsafe_jscode=True
)
def load_sidebar():
with st.sidebar:
st.markdown('# ☁️ AWS CloudWatch Logs')
profile = st.text_input('AWS named profile', value='')
try:
session = boto3.session.Session(
profile_name=profile,
region_name=os.environ.get('AWS_REGION', 'eu-central-1')
)
env = profile.split('-')[-1]
return env, session.client('logs')
except Exception as e:
st.error(e)
return None, None
def main():
env, cloudwatch = load_sidebar()
if env and cloudwatch:
cols = st.columns(2)
with cols[0]:
resource_type = st.text_input(
'Resource type',
help='AWS resource type as per log groups naming (e.g. `lambda`, `codebuild`, `ecs`, ...)'
)
with cols[1]:
resource_prefix = st.text_input(
'Common prefix',
help='Possible common prefix to resources (e.g. use case name)'
)
with cols[0]:
env = env if st.checkbox(f'Filter log groups containing -{env}') else None
if resource_type:
prefix = f'/aws/{resource_type}/'
if resource_prefix:
prefix += f'{resource_prefix}-'
matching_groups = get_log_groups(cloudwatch, prefix, env)
if matching_groups:
format_group = lambda x: x.replace(prefix, '').replace(f'-{env}', '')
log_group = st.selectbox(
'Matching resources',
options=matching_groups,
format_func=format_group)
# Log display
last_logs = get_last_log(cloudwatch, log_group)
if last_logs is not None:
st.markdown(f'## Most recent log for `{format_group(log_group)}`')
display_logs(last_logs)
else:
st.error(f"No streams for log group `{log_group}`")
else:
st.error('No matching log groups for given resource type and prefix.')
if __name__ == "__main__":
st.set_page_config(page_title='AWS CloudWatch logs',
page_icon='☁️', layout='wide')
main()
Structure¶
Given a virtual environment with the above requirements already installed and the AWS CLI installed and configured with (at least) a named profile, you should be good to go.
Usage:
- in the sidebar you can insert the named profile name as per
~/.aws/credentials
- in the Resource type text input you can insert the "type" of resource whose logs you want to retrieve (e.g. lambda, ecs, ...)
- you can further filter logs by request a match with a common prefix
- matching resources will be available in the subsequent selectbox
- selected resource logs will be shown through AgGrid, with (possible) errors line highlighted by default