Using Kinesis and Kibana to get insights from your data

Matthew Painter, CTO at import.io, walks you through how we use Amazon Kinesis for managing our routing of event data, such as queries being made on the platform, and how to allow your product and user teams to analyze the events in the fantastic Kibana 4, a “flexible analytics and visualization platform” that is powered by Elasticsearch.

A quick overview of Elasticsearch & Kibana

Elasticsearch

Elastic logo

Elasticsearch is now arguably the de-facto search platform, having surpassed Apache Solr due to its easy scalabilty, clean API design and awesome functionality. If you wonder what on earth indexes all the code on Github look no further.

Under the hood it is is a REST document store with a Lucene-based inverted index. This means that the objects stored in it are also flattened to be indexed by lucene such that they can be queried to, for example, find all documents containing a term, generate statistics about documents, or create aggregations.

Once you’ve indexed some documents you can do a wide range of analysis on their fields, looking at everything from term frequency to variance. However, trying to get most people to do their analysis via REST API isn’t going to happen 🙂 This is where Kibana comes in…

Kibana

Kibana dashboard

Kibana is an open source analytics and visualization platform that allows you to explore, visualize and dashboard the data held within your Elasticsearch cluster using the charts, tables, and maps probvided by the powerful graphing library d3.

Kibana’s forte is making it easy for non-technical users to understand large volumes of data. You can create and share in-browser dynamic dashboards that update in real-time as your data changes.

Kibana is often used with logstash to visualize log data, and I’ll be following this post up with an updated example of how we visualize ELB logs in kibana.

A quick overview of Kinesis

Kinesis is an AWS message distribution platform based on the classic producer/consumer publish/subscribe model:

Kinesis diagram

Streams

So we basically push messages onto streams, and we can have multiple “applications” (each identified by a unique application id) attached to a stream that consume a copy of every message added to the stream. This is the major difference here between queues and streams – a stream is a decoupled broadcast model. If you’re thinking “that sounds like a topic”, then yep, a stream is a topic, or any other name for a logical channel.

The events (“data records”) are retained by Kinesis for 24 hours, so if you connect to a stream you will get all the messages that your application has not yet recieved in the last 24 hours – whatever you produce but don’t consume in that time will be lost to that application!

Scaling

It’s a sharded scaling model, which uses a consistent hash ring via a partition key to assign messages to shards. You can increase your stream throughput by scaling out through adding more shards. The client libraries handle all of the sharding events transparently with zero downtime. Each shard can support up to 1,000 records per second for writes, up to a maximum total data write rate of 1 MB per second (including partition keys). You can scale a Kinesis stream like an Auto-Scaling Group, albeit not quite so easily currently.

Data Records

So your producers push data records, into your streams. Each message is a BLOB up to 1MB. It’s up to use if you use protobuffers, gzipped JSON, BSON, Avro, or whatever else takes your fancy. Just be aware that whatever you use for your encoding, your consumers will need to be able to decode it! This was the motivation for using gzipped JSON for us: compatibility with easily injecting the events into RedShift, the AWS data warehouse product.

A message has an associated partition key that is a unicode string that is used to partition the messages into ordered sub-streams, and also determine what shard it is routed to. This can normally be random.

Creating a stream

You can create a new stream in the AWS console, as well as see stats such as:

Kinesis stats

Just add a single shard to it to begin with, you can always increase it later.

The usual rules with AWS apply here when rolling out in a production setting: create user accounts for different producers and consumers via IAM and permission them for only the streams they need access to.

Producing messages

Once you’ve added a stream you can start to add data to it! Adding via the Java SDK is simple:

Once you’ve got the hang of it, it is worth batching up your PutRecordrequests into bulk PutRecords requests to reduce the amount of HTTP overhead. You can create a buffer within your application and flush the events when the next event pushes the size above 5MB (for the entire request, so leave some headroom!) or 500 records, or every N seconds, whichever comes first.

Brilliant, so we’ve got a stream and we’re pushing our page events on to it when people are running queries on the platform, and we need to analyze them.

Kibana and Elasticsearch

We’re going to set up Elasticsearch such that we have rolling indexes of page data, where a new index called “Kinesis-YYYYMMDD” is created every day and an alias “Kinesis” points to the most recent.

 Get a stack up and running using docker compose

We use docker to compose many of our services, and this is the approach we are going to use here. I’m going to presume that you have docker & docker compose installed on your machine – you could always do it direct if you wanted.

I’m going to set up a dev Kibana environment using docker compose:

git clone https://github.com/mjgp2/docker-elk
cd docker-elk
docker-compose up

I’ve tuned this fork so the Elasticsearch transport port is exposed, and removed logstash.

If you want your data to be persisted, you will need to follow the instructions to do that.

Needless to say, if you want this in production you’ll need a cluster and all the ops work to monitor, etc. Or just use found!

Create an index template

Create an index template in Elasticsearch so that we can create new rolling indexes every day to contain new data called Kinesis-* with the same schema:

curl -XPUT $DOCKER_HOST:9200/_template/kinesis -d '{
    "template": "kinesis-*",
    "settings": {
        "number_of_shards": 1
    },
    "mappings": {
        "page": {
            "_source": {
                "enabled": false
            },
            "properties": {
                ... add your properties here ...
            }
        }
    }
}
'

We’re just creating 1 shard here as it’s a dev setup.

This is the time to decide what you want to analyze and visualize in Kibana, and specifically if you want to geo plot data, it’s best to have the geohash turned on:

"geoPoint": {
    "type": "geo_point",
    "geohash": true,
    "lat_lon": true
}

And similarly if you are analyzing data with a time component, a field like this:

"timestamp": {
    "type": "date"
}

Create the index and the alias

Next we need to create the first Elasticsearch index and the alias:

TODAY=$(date +'%Y%m%d')
curl -XPUT $ES_HOST:9200/kinesis-$TODAY/
curl -XPOST $ES_HOST:9200/_aliases -d'
{
    "actions": [
        { "add":    { "index": "kinesis-'$TODAY'", "alias": "kinesis" }}
    ]
}
'

Rolling the index

We need to set the index up to roll every day. We would put on an administration box somewhere as a cron job a script that each day performed the roll such as:

TODAY=$(date +'%Y%m%d')
curl -XPUT $ES_HOST:9200/kinesis-$TODAY/
curl -XPOST $ES_HOST:9200/_aliases -d'
{
    "actions": [
        { "remove":    { "index": "kinesis-*", "alias": "kinesis" }},
        { "add":    { "index": "kinesis-'$TODAY'", "alias": "kinesis" }}
    ]
}
'

You can use curator to close or delete the indexes after a certain number of days as part of your cron job, e.g.

pip install elasticsearch-curator
curator --host $ES_HOST close indices --older-than 30 --time-unit days --timestring %Y%m%d --regex "^kinesis-"

Consuming messages

Now we need to get the data flowing from Kinesis to our index!

Your application id can subscribe to a stream via the AWS client library, and you will be pushed new events with low-ish latency (less than 10s in my experience) – along with the events from the last 24 hours the first time you connect. The last message you consumed is tracked via a dynamo db table that is transparently created by the client libraries.

I’ve made an example application available on github so you can see exactly what you need to do in Java to make this happen. It boils down to a:

  • A config file
  • An executor class to read the config file, initialize the connection, and wire up the transformer
  • A transformer to convert a Kinesis data record to a java object to an Elasticsearch object

The example also geocodes the IP so that we can plot where our users are querying us from.

Once you’ve set this up, you can get your small java application running as a server on an admin server somewhere, and it will bulk insert into Elasticsearch!

Using Kibana

You can now load up Kibana on http://$KIBANA_HOST:5601/ and in the setup wizard choose to use your kinesis-* indexes to visualize data.

You can then create visualizations and dashboards from those visualizations. Here’s an example for us of where in the last 24 hours users made queries on our platform from around the world 🙂

import.io users around the world

You can use many d3 charts to visualize data in Kibana, including:

  • Area Chart
  • Data Table
  • Line Chart
  • Markdown Text Widget
  • Pie Chart (including “doughnut” charts)
  • Raw Document Widget
  • Single Metric Widget
  • Tile Map
  • Vertical Bar Chart

and also power these by statistics from your indexed data, including:

  • Unique counts (cardinality)
  • Non-date histograms
  • Ranges
  • Significant terms
  • Percentiles

Gathering insights

Now you’ve got your data available through Kibana to your internal users, you can let them loose to start analyzing the data. For example, insights that we’ve gathered internally include:

  • Splitting usage from data-centers and home computers
  • Judging the size of our market in different countries
  • Tracking overall data throughput
  • Operationally alerting us when we experience elevated query error rates
  • Modelling how and when we should scale based on pre-empting demand
  • Understanding what users are querying
  • Segmenting users by CPU time, i.e. cost
  • Mapping percentiles of query CPU time to better decide what our limits should be
  • Analyzing platform usage for users who become leads to find predictive markers

And these are just some of the insights fed back to me directly after asking some of the team! Once your product, ops, user and account teams have access to data like this to analyse they will come up with things they want to know that you haven’t even dreamed of yet 🙂

Conclusion

That’s a whistlestop tour of how to get your Kinesis stream into Elasticsearch to use Kibana to get business insights. Any questions feel free to contact me!

Extract data from almost any website


INSTANT FREE ACCESS