On April 8, Amazon ES launched support for event monitoring and alerting. To use this feature, you work with monitors—scheduled jobs—that have triggers, which are specific conditions that you set, telling the monitor when it should send an alert. An alert is a notification that the triggering condition occurred. When a trigger fires, the monitor takes action, sending a message to your destination.
This post uses a simulated IoT device farm to generate and send data to Amazon ES.
Click through for a demo.
To perform the steps below, I set up a single Ubuntu 16.04 machine on AWS EC2 using local storage. In real-life scenarios you will probably have all these components running on separate machines.
I started the instance in the public subnet of a VPC and then set up a security group to enable access from anywhere using SSH and TCP 5601 (for Kibana). Finally, I added a new elastic IP address and associated it with the running instance.
The example logs used for the tutorial are Apache access logs.
This is a great walkthrough on setup and basic configuration. If you don’t have something in place to manage logs, the ELK stack is fine.
Step 4: [Creating master Core]
First, we need to create a core for indexing the data. The Solr create command has the following options:
- -c <name> — Name of the core or collection to create (required).
- -d <confdir> — The configuration directory, useful in the SolrCloud mode.
- -n <configName> — The configuration name. This defaults to the same name as the core or collection.
- -p <port> — Port of a local Solr instance to send the create command to; by default the script tries to detect the port by looking for running Solr instances.
- -s <shards> — Number of shards to split a collection into, default is 1.
- -rf <replicas> — Number of copies of each document in the collection. The default is 1.
In this example, we will use the -c parameter for core name, -rf parameter for replication and -d parameter for the configuration directory.
Read on for step-by-step instructions.
You will need the following information to connect to Elasticsearch as a JDBC data source:
- Driver Class: Set this to
- Classpath: Set this to the location of the driver JAR. By default, this is the lib subfolder of the installation folder.
The DBI functions, such as
dbSendQuery, provide a unified interface for writing data access code in R. Use the following line to initialize a DBI driver that can make JDBC requests to the CData JDBC Driver for Elasticsearch:
Read on for the full instructions.
There’s been a lot of time we have been working on streaming data. Using Apache Spark for that can be much convenient. Spark provides two APIs for streaming data one is Spark Streaming which is a separate library provided by Spark. Another one is Structured Streaming which is built upon the Spark-SQL library. We will discuss the trade-offs and differences between these two libraries in another blog. But today we’ll focus on saving streaming data to Elasticseach using Spark Structured Streaming. Elasticsearch added support for Spark Structured Streaming 2.2.0 onwards in version 6.0.0 version of “Elasticsearch For Apache Hadoop” dependency. We will be using these versions or higher to build our sbt-scala project.
Click through for an example.
The mappings Elastic SQL uses are:
Index = Table
Document = Row
Field = Column
This mapping is quite intuitive. Types are left out because they are obsolete in Elastic 6.0 on.
So let’s give it a try. I used the latest Elastic 6.4 for this demonstration and ran the queries from Kibana, although they can be run with curl or just a browser as well. First we will need some data. I found this article in Elastic documentation that suggests several data files ready to be loaded. I did not need all of the data so I only used the json file that contains all the works of William Shakespeare that can be downloaded here.
Feasel’s Law continues.
So far, I’ve done a decent job getting the data into shape. My biggest challenge, though, was the dates and times. Dates are in one field, and the times are in another. Dates look like 2014-02-26 and times look like 0852 Using a traditional datetime datatype would be nice to have, so I’ll have to do it myself. In order to turn a date and time into a datetime, I need to abut the two fields and then convert it.
I accomplished this by using a mutate filter, employing by several add_field commands. Notice how I simply abut the two times.
Read on to see how Mike does it.
Like I said earlier, we have some data that I know I’ll never use. This is flight performance data. The dataset contains diversion information. If a flight gets diverted more than once, it’s tracked here. I don’t care about that, so I’m dropping the diversion information for the second through fifth diversions. I’m also dropping some information about the airports that I believe I won’t need. This is the tricky part. Somewhere down the road, I’m going to need to enhance this data by converting all of the times to UTC.
Mike’s slowly building up to a complete, working example and it’s interesting to watch the progress along the way.
We splitted the pipeline into 2 main units: The aggregator job and the persisting job. The aggregator has one and only one responsibility. To read from the input kafka topic, process the messages and finally emit them to a new kafka topic. The persisting job then takes over and whenever a message is received from topic
temperatures.aggregatedit persists to elasticsearch.
The above approach might seem to be an overkill at first but it provides a lot of benefits (but also some drawbacks). Having two units means that each unit’s health won’t directly affect each other. If the processing job fails due OOM, the persisting job will still be healthy.
One major benefit we’ve seen using this approach is the replay capabilities this approach offers. For example, if at some point we need to persist the messages from
temperatures.aggregatedto Cassandra, it’s just a matter of wiring a new pipeline and start consuming the kafka topic. If we had one job for processing and persisting, we would have to reprocess every record from the
thermostat.data, which comes with a great computational and time cost.
Angelos also discusses some issues he and his team had with Spark Streaming on this data set, so it’s an interesting comparison.
As I was writing this, I thought I’d play with the autodetect_column_namessetting. Unfortunately, it wasn’t an option for this particular file. Logstash threw an error :exception=>java.lang.ArrayIndexOutOfBoundsException: -1which leads me to guess that my file is too wide for this setting. This file is staggeringly wide with 75 columns. If you have a more narrow file, this could be a really cool option. If your file format changes by someone adding or removing a column from the CSV, it’ll be a lot easier to maintain. Alas, it’s not an option in this situation.
Check out the script.