Skip to content


How To Ingest App Metrics from Slack into Elasticsearch

Recently I started helping Cardbucks, a very early-stage startup team. They’re running pretty bare-bones during their early stage market-fit experiments and haven’t setup any application monitoring or business intelligence solution for their users yet. However, they’ve been logging all user actions to a Slack room from Day One, which is awesome. So for a hack day, I built a bot to scrape the historical messages as well as ingest all new incoming metrics from Slack into Elasticsearch.

Ingest Real-Time Metrics

The first thing was to find an easy bot framework that both lets me receive new messages in (near) real-time. The Slack Team have generously provided the skeleton with python-rtmbot. This is a callback-based bot engine, so we need only write a simple plugin and configure it with our Slack token for ingesting metrics from the real-time message stream.

Below is a simple example of how we did this for the Cardbucks team. Obviously, since we’re parsing the textual messages into meaningful metrics, you’ll need to customize this to the metrics messages that your application is logging to Slack.

"""
Listens to incoming Slack messages from cardbucks-bot and stores the metrics in Elasticsearch
"""
from elasticsearch import Elasticsearch
from datetime import datetime
import re
 
# rtmbot variables
crontable = []
outputs = []
 
ELASTIC_HOST = '[your-hostname]'
ELASTIC_INDEX = '[your-index]'
ELASTIC_TYPE = '[your-type]'
USER_ID = '[your Slack bot id]'
CHANNEL = '[your Slack channel id]'
 
METRIC_PATTERNS = {
    'user_registration':       re.compile('New User Registration')
    'user_disabled':           re.compile('User Disabled(: )?(?P.*)'),
    'user_enabled':            re.compile('User Enabled'),
    'payment_failed':          re.compile('Payment Failed'),
    'fingerprint_blacklisted': re.compile('Fingerprint Blacklisted'),
    'ip_addr_blacklisted':     re.compile('IP Address Blacklisted'),
    'purchase_confirmation':   re.compile('User purchase\n(?P.*)', re.DOTALL)
    # and many more types of events captured...
}
 
POST_PROCESSING = {
  'purchases': lambda purchases, fields: process_purchases(purchases, fields)
   # ... and more of these too
}
 
# global variables
client = Elasticsearch(ELASTIC_HOST)
epoch = datetime.utcfromtimestamp(0)
 
def process_purchases(purchases, fields):
    if purchases:
        fields["cards"] = []
        for per_merchant in filter(None, purchases.split('\n')):
            count, merchant = per_merchant.strip().split(None, 1)
            fields["cards"].append({"merchant": merchant, "value": int(count)})
    return fields
 
def to_metric(data):
    for event, pattern in METRIC_PATTERNS.iteritems():
        m = pattern.match(data['text'])
        if m:
            doc = {
                'event': event,
                'source': 'slack',
                'timestamp': datetime.utcfromtimestamp(float(data['ts']))
            }
            fields = m.groupdict()
            for field, handler in POST_PROCESSING.iteritems():
                fields = handler(fields.pop(field, None), fields)
            return dict(doc.items() + fields.items())
 
def process_message(data):
    # Filter out only the metrics messages we care about
    if data['user'] == USER_ID and data['channel'] == CHANNEL:
        metric = to_metric(data)
        if not metric:
            print "ERROR parsing %s" % data
        else:
            client.index(ELASTIC_INDEX, ELASTIC_TYPE, metric)

 

Backfill Historical Metrics

After we have new metrics coming into the system, we also need to backfill all of the existing metrics from Day One into Elasticsearch. This will help us get a much better idea of growth rate and visualize other trends in our data.

Reading through the Slack API, we see the `channels.history` api call. This looks like exactly what we need. Again, the Slack Team provides for us. The standard python-slackclient makes this such API calls easy to access.

#!/usr/bin/env python
from slackclient import SlackClient
from elasticsearch import Elasticsearch
from plugins.elastic.elastic import CHANNEL, USER_ID, process_message, to_metric
from datetime import datetime
import yaml
import json
 
ELASTIC_HOST = '[your-hostname]'
SIZE = 1000
OLDEST = 0
LATEST = datetime.now()
 
config = yaml.load(file('rtmbot.conf', 'r'))
sc = SlackClient(config["SLACK_TOKEN"])
client = Elasticsearch(ELASTIC_HOST)
 
response = json.loads(sc.api_call('channels.history', channel=CHANNEL, count=SIZE, oldest=OLDEST, latest=LATEST))
for message in response['messages']:
    if 'channel' not in message:
        message['channel'] = CHANNEL
    if 'user' not in message:
        message['user'] = USER_ID
    process_message(message)
 
while response['has_more']:
    oldest_ts = response['messages'][-1]['ts']
    response = json.loads(sc.api_call('channels.history', channel=CHANNEL, count=SIZE, oldest=OLDEST, latest=oldest_ts))
    for message in response['messages']:
        if 'channel' not in message:
            message['channel'] = CHANNEL
        if 'user' not in message:
            message['user'] = USER_ID
        process_message(message)
 
if 'is_limited' in response and response['is_limited']:
    print "We have more messages that we can't access. :'("

Hopefully, you’re either early enough in the process, have few enough metrics, or pay for Slack so that you don’t run into their 10,000 message history limit.

Now that you have all of your data in Elasticsearch, go forth and visualize with Kibana!

Posted in Tutorials.


One Response

Stay in touch with the conversation, subscribe to the RSS feed for comments on this post.

  1. Jonathan Stiansen says

    Hey Cody,

    Really loving your two articles on 12FAs. And I wouldn’t be emailing you if you hadn’t made such great content already. A few points of improvement:

    Not the point of this message but I’ve got some recommendations on the format of your dockerfile, like having `ADD . /myapp` before installing requirements. This will cause requirements to be redownloaded/installed everytime you change any file since changing some file in ADD will invalidate all cached layers behind it, this will make development REALLY crappy. What can be more useful is copying requirements before that, running install. At the end is usually the best time to `ADD . /app` since it means no other layers get invalidated on change.

    The big one is I was wondering if I could ask a question about step 5 of your 12 factor app? I’ve been struggling with this part myself in our business. I think there is another way to do it, that would have less downsides than your described way. I also think your method would not work well with compiled apps (ones that can be MUCH smaller/faster when compiled for production). I might be misreading it, so here is what I’m reading you saying:

    “Use a base dockerfile/image for your app”
    “Add environment variables for different environments through another dockerfile”

    Problems:
    – You’ll have so many images on your system that aren’t providing anything except environment variables
    – Some apps like java c++ really any compiled languages need to be built totally differently, which means you base image won’t work for production AND testing of such apps.
    – Most easily though, you should insert environment variables at runtime! Also can you imagine putting all your secrets into a dockerfile and checking it into source control? That could be a big security risk!
    – Instead you can, at runtime use -e or better yet, –env-file to give it a non-version controller (for security) secret file or config file. So you can literally run the same container for each env with a different env-file! :-) Safe, faster, and simpler!

    Anyways let me know what you think, thanks for your work really enjoying it.



Some HTML is OK

or, reply to this post via trackback.

 



Log in here!