Step 4: [Creating master Core]
First, we need to create a core for indexing the data. The Solr create command has the following options:
- -c <name> — Name of the core or collection to create (required).
- -d <confdir> — The configuration directory, useful in the SolrCloud mode.
- -n <configName> — The configuration name. This defaults to the same name as the core or collection.
- -p <port> — Port of a local Solr instance to send the create command to; by default the script tries to detect the port by looking for running Solr instances.
- -s <shards> — Number of shards to split a collection into, default is 1.
- -rf <replicas> — Number of copies of each document in the collection. The default is 1.
In this example, we will use the -c parameter for core name, -rf parameter for replication and -d parameter for the configuration directory.
Read on for step-by-step instructions.
You will need the following information to connect to Elasticsearch as a JDBC data source:
- Driver Class: Set this to
- Classpath: Set this to the location of the driver JAR. By default, this is the lib subfolder of the installation folder.
The DBI functions, such as
dbSendQuery, provide a unified interface for writing data access code in R. Use the following line to initialize a DBI driver that can make JDBC requests to the CData JDBC Driver for Elasticsearch:
Read on for the full instructions.
There’s been a lot of time we have been working on streaming data. Using Apache Spark for that can be much convenient. Spark provides two APIs for streaming data one is Spark Streaming which is a separate library provided by Spark. Another one is Structured Streaming which is built upon the Spark-SQL library. We will discuss the trade-offs and differences between these two libraries in another blog. But today we’ll focus on saving streaming data to Elasticseach using Spark Structured Streaming. Elasticsearch added support for Spark Structured Streaming 2.2.0 onwards in version 6.0.0 version of “Elasticsearch For Apache Hadoop” dependency. We will be using these versions or higher to build our sbt-scala project.
Click through for an example.
The mappings Elastic SQL uses are:
Index = Table
Document = Row
Field = Column
This mapping is quite intuitive. Types are left out because they are obsolete in Elastic 6.0 on.
So let’s give it a try. I used the latest Elastic 6.4 for this demonstration and ran the queries from Kibana, although they can be run with curl or just a browser as well. First we will need some data. I found this article in Elastic documentation that suggests several data files ready to be loaded. I did not need all of the data so I only used the json file that contains all the works of William Shakespeare that can be downloaded here.
Feasel’s Law continues.
So far, I’ve done a decent job getting the data into shape. My biggest challenge, though, was the dates and times. Dates are in one field, and the times are in another. Dates look like 2014-02-26 and times look like 0852 Using a traditional datetime datatype would be nice to have, so I’ll have to do it myself. In order to turn a date and time into a datetime, I need to abut the two fields and then convert it.
I accomplished this by using a mutate filter, employing by several add_field commands. Notice how I simply abut the two times.
Read on to see how Mike does it.
Like I said earlier, we have some data that I know I’ll never use. This is flight performance data. The dataset contains diversion information. If a flight gets diverted more than once, it’s tracked here. I don’t care about that, so I’m dropping the diversion information for the second through fifth diversions. I’m also dropping some information about the airports that I believe I won’t need. This is the tricky part. Somewhere down the road, I’m going to need to enhance this data by converting all of the times to UTC.
Mike’s slowly building up to a complete, working example and it’s interesting to watch the progress along the way.
We splitted the pipeline into 2 main units: The aggregator job and the persisting job. The aggregator has one and only one responsibility. To read from the input kafka topic, process the messages and finally emit them to a new kafka topic. The persisting job then takes over and whenever a message is received from topic
temperatures.aggregatedit persists to elasticsearch.
The above approach might seem to be an overkill at first but it provides a lot of benefits (but also some drawbacks). Having two units means that each unit’s health won’t directly affect each other. If the processing job fails due OOM, the persisting job will still be healthy.
One major benefit we’ve seen using this approach is the replay capabilities this approach offers. For example, if at some point we need to persist the messages from
temperatures.aggregatedto Cassandra, it’s just a matter of wiring a new pipeline and start consuming the kafka topic. If we had one job for processing and persisting, we would have to reprocess every record from the
thermostat.data, which comes with a great computational and time cost.
Angelos also discusses some issues he and his team had with Spark Streaming on this data set, so it’s an interesting comparison.
As I was writing this, I thought I’d play with the autodetect_column_namessetting. Unfortunately, it wasn’t an option for this particular file. Logstash threw an error :exception=>java.lang.ArrayIndexOutOfBoundsException: -1which leads me to guess that my file is too wide for this setting. This file is staggeringly wide with 75 columns. If you have a more narrow file, this could be a really cool option. If your file format changes by someone adding or removing a column from the CSV, it’ll be a lot easier to maintain. Alas, it’s not an option in this situation.
Check out the script.
Logstash is an incredibly powerful tool. If you can put data into a text file, Logstash can parse it. It works well with a lot of data, but I’m finding myself using it more to use it for event data. When I say event data, if it triggers a log event and it writes to a log, it’s an event. For the purposes of my demos, I’m using data from the Bureau of Transportation Statistics. They track flight performance data, which works perfectly for my uses. It’s a great example dataset without using anything related to my real job.
Logstash configuration files typically have three sections, INPUT, FILTER, and OUTPUT. However, FILTER is optional.
This is the first part in a series, so stay tuned.
In Elasticsearch, aggregations framework is responsible for providing the aggregated data based on a search query. Aggregations can be composed together in order to build complex summaries of the data. For a better understanding, consider it as a unit-of-work. It develops analytic information over a set of documents that are available in Elasticsearch. Various types of aggregations are available, each of them having its own purpose and output. For simplification, they are generalized to 4 major families:
Here each bucket is associated with a key and a document. Whenever the aggregation is executed, all the buckets criteria are evaluated on every document. Each time a criterion matches, the document is considered to “fall in” the relevant bucket.
Metrics are the aggregations which are responsible for keeping a track and computing the metrics over a set of documents.
Matrix are the aggregations which are responsible for operating on multiple fields. They produce a matrix result out of the values extracted from the requested document fields. Matrix does not support scripting.
Pipeline are the aggregations which are responsible for aggregating the output of other aggregations and their associated metrics together.
If you deal with Elasticsearch (or have log data that you want to query through), this tutorial will give you an idea of what you can do.