We recently did some work for Arachnys who provide data on a wide range of emerging markets. Their data, gathered by a complex process of web crawling, is stored in HBase and served out of a 10-node Elasticsearch cluster. Periodically, better ways of extracting data from the raw crawl will be implemented and the entire dataset will need to be reprocessed and reindexed. Data was being extracted from HBase using a map scan, and written to Elasticsearch using the elasticsearch-hadoop
plugin (maintained by Elastic). Flax was called in when it was discovered that with their existing configuration it would take about a month to reindex all of the data (over 1 billion documents, around 73 terabytes on disk). Indexing was also throwing large numbers of errors and moving forward at about 700 documents per second.
Running some local tests with similar data showed Elasticsearch was indexing about 400 documents per second on a single node – a 10 node cluster on dedicated server hardware ought to be doing a lot better than 700 documents per second!
Our first step was to look at the errors being produced. Most were simple timeouts, but some included a ‘too many open files’ error message. This normally happens when the server Elasticsearch is running on has its ulimit
set too low, but in this case the ulimit
was set at 65535 file handles, which should be more than enough for a server with a single index.
To investigate this further we used the very nice Segment Spy plugin, which shows you the structure of Elasticsearch’s underlying Lucene index. A Lucene index is divided into multiple immutable segments, and a normal index will probably have around 30 segments of varying sizes. Segment Spy revealed that our index contained several thousand very small segments. Each open segment can use between 10 and 15 file handles, which would explain our ‘too many open files’ error.
To understand how this could happen we need to consider how Lucene manages incremental indexing. As documents are added to a Lucene index, they are buffered in memory until a commit is triggered (either explicitly, or by passing memory or document count limits), at which point they are written out as a segment. Lucene also allows concurrent indexing: your indexing process can have multiple threads, each with their own document writer, and when a writer’s buffer is filled it is swapped out for a new, empty writer, while the filled one is flushed to disk.
Once a segment has been written another process will check the state of the index as a whole and decide if certain segments can be merged together. This merging process usually happens concurrently (in a background thread) and should prevent the scenario we were seeing above by ensuring that multiple small segments are quickly merged together into larger ones.
Merging can be I/O intensive and so Elasticsearch has a feature called merge throttling that will try and ensure that enough system resources are available to serve queries. In our case, it seems that the throttling meant that the merge simply couldn’t keep up with the number of indexing threads.
There were two overlapping issues here: too much concurrent indexing, and merges not keeping up. The HBase scan was divided into 256 separate mappers, and as no use was being made of document routing, each mapper job would end up sending documents to each node. This level of concurrency was clearly too high.
By reducing the number of map jobs to 10, and switching off merge throttling, the indexing rate jumped to approximately 4000 documents per second. This was good – but we wanted to see if we could improve this still further! There was still a fair amount of idle CPU time over the cluster, suggesting that there were more increases in document throughput to be gained.
The elasticsearch-hadoop
plugin is designed to be very small and self-contained, with as few dependencies as possible. It uses a single-threaded model for indexing, batching records up, sending them in bulk and then blocking until Elasticsearch responds. We replaced this with our own Hadoop OutputFormat implementation using an Elasticsearch TransportClient and BulkProcessor. The BulkProcessor uses a threaded model that sends bulk requests to Elasticsearch using a configurable number of background threads, meaning that there is little or no time spent waiting for responses. This makes error handling a little more tricky (although doable), but we found that it doubled indexing speed again, and this time CPU was pretty much maxed out across the cluster.
Turning off replication gave an added boost, and our final tests gave a reindex time of approximately 17 hours for plain indexing – a marked improvement on the initial 30 day turnaround time!
Nice article!
One question:do you know why doing multithreaded (eg 3 threads in local pool) bulk indexing out of 10 map tasks is not same as doing 30 map tasks with blocking bulk requests(ie using elastic-search-hadoop)