Bulk load data into HBase


We had a use case where we had to dump events from Kafka stream into HBase. We were using spark to capture the events per second and store them in Hbase. The problem was, we were getting at least 100,000 events/second and our target was way more than that. Hbase was taking its own sweet time doing a put per event. It was obvious that we had to consider bulk loading the data into Hbase. I will not go into why bulk loading is faster than normal loading of data. You can read about it in this cloudera blog.

That blog outlines three steps in Bulk load –

  1. Extract the data into HDFS
  2. convert the data into Hfiles, a format that Hbase stores data in
  3. Inform Hbase about the location of these Hfiles

What I would write about is how to convert those three steps into code that works. I am using spark in the examples

Extract the data into HDFS

This is actually the easiest step among three and the one that lies completely independent of Hbase. In our case we had to pull data from kafka and store it in an RDD for that time window. At the end of this step you should have the data to be loaded into Hbase, ready to be processed on HDFS. Just for the sake of this discussion, I will use a dummy dataset.

//a simple data set of strings
val rdd = sc.parallelize(List("a", "b", "c", "d"))

Convert the data into HFiles

You can actually write a mapreduce program to complete this step. This should give you some pointers to get started.

This is the longest step among the three. We can break down this step into sub tasks –

Partition the data to match Hbase region servers

We would want to partition the data to match the partitioning of Hbase region servers. Since, we are using spark RDDs, it partitions the data set based on the hashcode of the entire record of that dataset. Hbase region servers on the other hand partition data based on the row key that is associated with that row. In fact, you can consider this task optional but it will be helpful later.

val saltMod = 2 //for salting the key with
//zipWithUniqueId attaches a unique ID to every record of the rdd
//repartitionAndSortWithinPartitions will partition the data 
val partitioned = rdd.zipWithUniqueId()
 .map({ case (v, k) => (Math.abs(k.hashCode()) % saltMod, v) })
 .repartitionAndSortWithinPartitions(new HashPartitioner(saltMod))

What we did is, we simply attached a unique ID to every record of the RDD and we made a new key out of that ID(salting). Based on this new key, we re-partitioned the RDD.

Map the data into a Hbase writable format

When I say Hbase writable format, I mean a subclass of org.apache.hadoop.io.Writable interface. In our example, we will convert to org.apache.hadoop.hbase.KeyValue class.

Of course, before we think about bulk loading the data, we need to have an Hbase table ready. I brought that up just now because, we will use the table name and column family names in the code.

//lets get declarations out of the way 
val tableName = "dummy_table"
val columnFamilyName = "dummy_cf"
val qualifierName = "dummy_message"

//lets convert the data
val transformed = partitioned.map({
     case (k, v) =>
         val key = Bytes.toBytes(v.hashCode() )
         val kv = new KeyValue(key, 
                         Bytes.toBytes(columnFamilyName), 
                         Bytes.toBytes(qualifierName), 
                         Bytes.toBytes(v))
         (new ImmutableBytesWritable(key), kv)
 })

Ready the configuration for Hbase

Most of the configuration we used here are pretty standard except for fs.permissions.umask-mode property. This property defines what the permissions of the HFiles created at the end of this step is going to have. Setting this property with 000 makes sure that the HFiles have 777 permission set. If this is not desirable, you need to choose an appropriate umask value. Permissions are important in this context because, this spark job is run as you(user) and so the HFiles created at the end of this step will be owned by you. But, when you inform HBase of these HFile locations, Hbase will run bulk load operations using its own user (hbase). If that user doesn’t have permissions to access the HFiles, the job will fail. And yes, the HFiles can be deleted once Hbase completes its operation.

 val conf = HBaseConfiguration.create()
 conf.set(TableOutputFormat.OUTPUT_TABLE, "dummy_table")
 conf.set( "mapreduce.outputformat.class", "org.apache.hadoop.hbase.mapreduce.TableOutputFormat")
 conf.set("mapreduce.job.output.key.class", "org.apache.hadoop.hbase.io.ImmutableBytesWritable")
 conf.set("mapreduce.job.output.value.class", "org.apache.hadoop.hbase.KeyValue")
 conf.set("hbase.zookeeper.quorum", "localhost:2181")
 conf.set("zookeeper.session.timeout", 10000)
 conf.set("zookeeper.recovery.retry", 3)
 conf.set("hbase.zookeeper.property.clientport", 2181)
 conf.set("fs.permissions.umask-mode", "000")

Save the data in HFiles

While saving the HFile, you need to provide the class used as key, value and the output format. Hbase considers everything in terms of Bytes. There has to be a way to track what that bulk of byte was originally, we provide classes for that reason.

val path = "/tmp/hbase-bulk-load"
transformed.saveAsNewAPIHadoopFile(path,
    classOf[ImmutableBytesWritable],
    classOf[KeyValue], 
    classOf[HFileOutputFormat2], 
    conf)

Inform Hbase about the HFiles

 

Using the same configuration as before, we first create a HFile loader. And next, we get an instance of the Hbase table that we want to load the data into. Now, we can call the bulk load operation.

 val loader = new LoadIncrementalHFiles(conf)
 val connection = ConnectionFactory.createConnection(conf)
 val table = connection.getTable(TableName.valueOf(tableName))
                       .asInstanceOf[HTable]
 try {
     loader.doBulkLoad(new Path(path), table)
 } finally {
     connection.close()
     table.close()
 }

References

Advertisements

Running a druid ingestion task


Druid is an open source data store designed for low-latency exploratory analytics of large amounts of data. It combines a columnar storage layout, a distributed, shared-nothing architecture and an advanced indexing structure to ensure sub second latencies even while exploring billions of records. Druid is a very good for managing immutable but append-heavy data.

The druid website offers an in-depth documentation on how to use druid with an example wikiticker data set. The idea of this post is to walk you through the issues I had run into and how I got around it.

I have used Druid 0.9.2 for running these examples. We will use druid using HDFS. We will load the data into druid from files.

Ready the data

Download the data from this link. This link is not mentioned in documentation. It is available in the project’s git. Download the wikiticker gz file, extract it and upload it to HDFS. The dataset is in JSON format. Every JSON object in the dataset looks like this –

{
 "time": "2015-09-12T23:59:59.200Z",
 "channel": "#en.wikipedia",
 "cityName": null,
 "comment": "(edited with [[User:ProveIt_GT|ProveIt]])",
 "countryIsoCode": null,
 "countryName": null,
 "isAnonymous": false,
 "isMinor": false,
 "isNew": false,
 "isRobot": false,
 "isUnpatrolled": false,
 "metroCode": null,
 "namespace": "Main",
 "page": "Tom Watson (politician)",
 "regionIsoCode": null,
 "regionName": null,
 "user": "Eva.pascoe",
 "delta": 182,
 "added": 182,
 "deleted": 0
}

 

Ingest the data

The way to interact with druid is through REST APIs. If you ask me, that is a good way to go because, you can access druid services from any machine in the cluster without actually installing anything on that machine, it just needs to be able to access the URL. Also, there is no language dependency i.e. you are free to use the REST services any language you like. The downside is that, it is a bit painful to write druid queries by hand. The same is easier programmatically though. You can learn about the ingestion formats that druid accepts from here.

Let us load the data into druid.

{
  "type" : "index_hadoop",
  "spec" : {
    "ioConfig" : {
      "type" : "hadoop",
      "inputSpec" : {
        "type" : "static",
        "paths" : "wikiticker-2015-09-12-sampled.json"
      }
    },
    "dataSchema" : {
      "dataSource" : "wikiticker",
      "granularitySpec" : {
        "type" : "uniform",
        "segmentGranularity" : "day",
        "queryGranularity" : "none",
        "intervals" : ["2015-09-12/2015-09-13"]
      },
      "parser" : {
        "type" : "hadoopyString",
        "parseSpec" : {
          "format" : "json",
          "dimensionsSpec" : {
            "dimensions" : [
              "channel",
              "cityName",
              "comment",
              "countryIsoCode",
              "countryName",
              "isAnonymous",
              "isMinor",
              "isNew",
              "isRobot",
              "isUnpatrolled",
              "metroCode",
              "namespace",
              "page",
              "regionIsoCode",
              "regionName",
              "user"
            ]
          },
          "timestampSpec" : {
            "format" : "auto",
            "column" : "time"
          }
        }
      },
      "metricsSpec" : [
        {
          "name" : "count",
          "type" : "count"
        },
        {
          "name" : "added",
          "type" : "longSum",
          "fieldName" : "added"
        },
        {
          "name" : "deleted",
          "type" : "longSum",
          "fieldName" : "deleted"
        },
        {
          "name" : "delta",
          "type" : "longSum",
          "fieldName" : "delta"
        },
        {
          "name" : "user_unique",
          "type" : "hyperUnique",
          "fieldName" : "user"
        }
      ]
    },
    "tuningConfig" : {
      "type" : "hadoop",
      "partitionsSpec" : {
        "type" : "hashed",
        "targetPartitionSize" : 5000000
      },
      "jobProperties" : {}
    }
  }
}

now, POST this ingestion task to overlord instance

curl -X 'POST' \
-H 'Content-Type:application/json' \
-d @my-index-task.json \
[druid-overlord-host]:8090/druid/indexer/v1/task

You can look at the progress and logs of this index task at coordinator console. The UI is convenient to track progress of the task. But if the configuration setup is not right, you may not see the logs. If you cannot see the logs, you don’t know what went wrong when the task fails. Let’s take a step back and make changes to the configuration if logs are not showing up. If you are trying out druid for the first time like me, most likely you will need run into the same problems I did.

No log was found for this task. The task may not exist, 
or it may not have begun running yet.

add the following to conf/druid/middleManager/runtime.properties on the servers wherever middle manager service is running

druid.selectors.indexing.serviceName=druid:overlord
druid.indexer.logs.type=hdfs
druid.indexer.logs.directory=/tmp/druid-logs

and the following to conf/druid/overlord/runtime.properties on the server where overlord service is running.

druid.indexer.logs.type=hdfs
druid.indexer.logs.directory=/tmp/druid-logs

This will inform druid to dump the logs at /tmp/druid-logs on HDFS. And of course, you need to provide the HDFS write access for that location to the user running druid services. For the new configuration to take effect, you need to restart the services. Try POSTing the task again after restarting, this time you should be able to see the logs in the UI and also on HDFS /tmp/druid-logs.

It is very likely that your task failed now. Since, you can look at the log it is easy to figure out what went wrong. The first issue that I had after fixing logs is that it cannot find hadoop dependencies.

com.metamx.common.ISE: Hadoop dependency didn't exist
...

This one is actually easy to figure out. Look at the value of “ in conf/druid/_common/common.runtime.properties. This “ value informs druid where the hadoop dependencies are located in. Since, it is configured to the wrong location, it cannot find them and hence can’t progress. Hadoop dependencies are located in the home directory of druid i.e. the directory where druid tar is extracted to. Change that value to the correct location and this should go away.

The next one up is this –

Error: 
class com.fasterxml.jackson.datatype.guava.deser.HostAndPortDeserializer 
overrides final method 
deserialize.(Lcom/fasterxml/jackson/core/JsonParser;
  Lcom/fasterxml/jackson/databind/DeserializationContext;)
Ljava/lang/Object;

This error is due to the fact that hadoop and druid are using conflicting fasterxml versions. As a matter of fact there are many other dependencies that druid uses that are already present on hadoop . One common way to solve this issue is to build your own fat jar from druid source. If you do choose to build one, you might want to use this sbt file. There is however another way to solve this. Druid provides a way to pass in some properties for the job. Modify the index task to include these new job properties.

"jobProperties": {
"mapreduce.job.classloader": "true",
"mapreduce.job.classloader.system.classes": "-javax.validation.,java.,javax.,org.apache.commons.logging.,org.apache.log4j.,org.apache.hadoop."
}

The property mapreduce.job.classloader asks hadoop to use a separate classloader for each of hadoop and druid dependencies. i.e. it asks hadoop to load the classes from the jars that are submitted with druid rather than the classes present in system classpath, in case of a conflict.The property mapreduce.job.classloader.system.classes is the exclusion rule to the previous property. It defines what classes to be loaded from system classpath.

If you want to explore other ways to deal with this issue, visit their documentation on github. I actually arrived at the solution from this google-groups discussion

If druid is actually set up right on your cluster, your index task should run fine after the last one. To me, the mapreduce job got stuck at 100% reduce for quite a long time and all the reduce tasks were failing due to timeouts. When I checked the yarn logs, that when I realized that this last error I had was an access issue. Druid writes the segments to /druid by default on HDFS. It was failing because the user running the druid service did not have write access to that directory. Fixed that, and it was all fine.

If you are interested in learning more about the druid architecture and its inner workings, you should read this well written paper

References