Hadoop, Elasticsearch and Pig

What if the files and data are already in Hadoop HDFS. Is elasticsearch still useful? How does one create an index?

Consider a large number of PDF files, which need to be searched. As a first step, process each PDF file and store it as a record in an HDFS file. Then, you may experiment with two different but very simple approaches to create an index.

  • Write a simple python mapper using map/reduce streaming to create an index.

  • Install the elasticsearch-hadoop plugin and create an index using a Pig script.

The environment for these experiments will be the same as in the earlier articles – three virtual machines, h-mstr, h-slv1 and h-slv2, each running hdfs and elasticsearch services.

Load PDF Files into Hadoop HDFS

Enter the following code in 'load_pdf_files.py'. Each pdf file is converted to a single line of text. Any tab characters are filtered so that there are no ambiguities when using a Pig script. For each file, the output will be the path, tab, file name, tab and the text content of the file.

#!/usr/bin/python

from __future__ import print_function

import sys

import os

import subprocess

# Call pdftotext to convert the pdf file and store the result in /tmp/pdf.txt

def pdf_to_text(inpath,infile):

exit_code=subprocess.call(['pdftotext','/'.join([inpath,infile]),'/tmp/pdf.txt'],stderr=ErrFile)

return exit_code,'/tmp/pdf.txt'

# Join all the lines of the converted pdf file into a single string

# Replace any tabs in the converted documents

# Write the file as a single line prefixing it with the path and the name

def process_file(p,f):

exit_code,textfile = pdf_to_text(p,f)

if exit_code == 0:

print("%s\t%s"%(p,f), end='\t')

print("%s"% ' '.join([line.strip().replace('\t',' ') for line in open(textfile)]))

# Generator for yielding pdf files

def get_documents(path):

for curr_path,dirs,files in os.walk(path):

for f in files:

try:

if f.rsplit('.',1)[1].lower() == 'pdf'

yield curr_path,f

except:

pass

# Start here

# Search for each file in the current path of type 'pdf' and process it

try:

path=sys.argv[1]

except IndexError:

path='.'

# Use an error file for stderr to prevent these messages going to hadoop streaming

ErrFile = open('/tmp/err.txt','w')

for p,f in get_documents(path):

process_file(p,f)

Now, you can run the above program on your desktop and load data into a file in Hadoop HDFS as follows:

$ ./load_pdf_files.py ~/Documents |HADOOP_USER_NAME=fedora \

hdfs dfs -fs hdfs://h-mstr/ -put - document_files.txt

Using Map/Reduce to Create an Index

Log into h-mstr as user fedora and enter the following code in 'indexing_mapper.py'.

#!/usr/bin/python

import sys

from elasticsearch import Elasticsearch

# Generator for yielding each line split into path, file name and the text content

def hdfs_input(sep='\t'):

for line in sys.stdin:

path,name,text=line[:-1].split(sep)

yield path,name,text

# Create an index pdfdocs with fields path, title and text.

# Index each line received from Hadoop streaming

def main():

es = Elasticsearch(hosts='h-mstr')

for path,name,text in hdfs_input():

doc = {'path':path,'title':name, 'text':text}

es.index(index='pdfdocs', doc_type='text', body= doc)

if __name__ == "__main__" :

main()

Run the code in the following command on h-mstr:

$ hadoop jar /usr/share/java/hadoop/hadoop-streaming.jar \

-files indexing_mapper.py -mapper indexing_mapper.py \

-input document_files.txt -output es.out

The following URLs will give you information about the allocation and status of the index.

Using a PIG Script to Create an Index

The Fedora 20 repositories do not as yet have the pig distribution. It will be included in Fedora 21. So, download and install pig from the Apache site: http://pig.apache.org/releases.html on each of the virtual machines.

You will also need to install the elasticsearch-hadoop plugin on these systems. For example, you may run the following commands from h-mstr:

$ sudo /usr/share/elasticsearch/bin/plugin \
-u http://download.elasticsearch.org/hadoop/elasticsearch-hadoop-2.1.0.Beta2.zip \
-i elasticsearch-hadoop

$ ssh -t fedora@h-slv1 sudo /usr/share/elasticsearch/bin/plugin \
-u http://download.elasticsearch.org/hadoop/elasticsearch-hadoop-2.1.0.Beta2.zip \
-i elasticsearch-hadoop

$ ssh -t fedora@h-slv2 sudo /usr/share/elasticsearch/bin/plugin \
-u http://download.elasticsearch.org/hadoop/elasticsearch-hadoop-2.1.0.Beta2.zip \
-i elasticsearch-hadoop

The Pig script, indexing.pig, for creating the index is just four lines. The elasticsearch-hadoop jar file has to be registered. The hadoop text file is loaded. The tuple (path, text, title) is stored in(indexed by) elasticsearch storage.

REGISTER /usr/share/elasticsearch/plugins/hadoop/dist/elasticsearch-hadoop-2.1.0.Beta2.jar;

A = LOAD 'document_files.txt' USING PigStorage() AS (path:chararray, title:chararray, text:chararray);

B = FOREACH A GENERATE path, text, title ;

STORE B INTO 'docs/text' USING org.elasticsearch.hadoop.pig.EsStorage();

You can check the status of the indices and compare pdfdocs index created earlier with docs index created by running the Pig script:

[fedora@h-mstr ~]$ pig indexing.pig

The ultimate test is to compare the results of the two indices, e.g. you can browse the elasticsearch index searching for 'python' in the content and displaying upto 25 values. Only the values of fields path and title will be displayed.

The more flexible option is to use a json string to query as follows(for details http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/query-dsl.html):

curl "h-mstr:9200/pdfdocs/_search?pretty=true" -d '{

"fields": [“path”, "title"],

“size”: 25,

"query": { "query_string": { "query": "python" }}}'

If all has gone well, you should get same answers for the queries, whether you use docs or pdfdocs index.

Comments