DaveWentzel.com            All Things Data

Data Architecture

Apache Flume

During my evaluation of NoSQL solutions the biggest hurdles I had, by far, was loading data into Hadoop.  The easiest way I found to do this was using Apache Flume.  But it took me a long time to figure out that my approach to data loading was wrong.  As a RDBMS data architect I was biased towards using techniques to load Hadoop that I would use to load a SQL Server.  I tried scripting (which is how most of the NoSQL solutions have you load their sample data, but tends not to work well with your "real" data) first but the learning curve was too high for a proof-of-concept.  I then tried using ETL tools like Informatica and had better success, but it was still too cumbersome.  

I began thinking like a NoSQL Guy and decided to use HiveQL.  I had better success getting the data in to Hadoop...but now I had to get the data out of my RDBMS in a NoSQL-optimized format that I could quickly use HiveQL against.  

As my journey continued I thought about how I would intend to get my data into Hadoop if we ever deployed it as a real RDBMS complement.  We would probably write something that "listened" for "interesting" data on the production system and then put it into Hadoop.  That listener is Apache Flume.  Why not just point Flume to listen in to our Service Broker events and scrape the data that way.  I had that up and running in a few hours.  

Flume works by having an agent running on a JVM listen for events (such as Service Broker messages to a JMS system).  The events are queued up into a "Channel".  The channels produce new outgoing events to Hadoop/HDFS/HBase (or whatever you use for persistence).  So, why use a channel in the middle?  Flexibility and asynchronicity.  The channels have disaster recovery mechanisms built in.  As your throughput needs change you can configure your agents to do fan-in (more agents talk to fewer channels) or fan-out (less agents talk to more channels).  The former is great if you have, for instance, multiple listeners that only need to talk to one Hadoop instance.  The latter is good if you have one source system that you want to talk to multiple Hadoop instances.  Or if you need to route messages to a standby channel to do maintenance on your Hadoop instance.  

This means that Flume is very scalable and can handle constant data streams effectively without worry of data loss.  This is great for something like streaming stock quotes.  Flume is by far the easiest way to load data quickly into Hadoop.  

More NoSQL Solutions

In this post in my NoSQL Series I wanted to briefly cover a few remaining NoSQL options.  I tested these briefly but did not try to tackle a full proof-of-concept with any of these due to time.  

Memcached is a giant associative in-memory array that can be scaled-out and sharded across multiple nodes.  I've used this in the past as a caching mechanism for websites I've created with great success.  We didn't feel in-memory was a good options for our requirements.  

Cassandra was originally an internal Facebook project.  It too is a distributed key/value store with a similar data model to BigTable.  Balanced sharding is native.  Querying is via Hadoop.  It is optimized for writes over reads which is a big difference to other distributed key stores.  

Redis is architecturally similar to Memcached.  I use this extensively for smaller web projects that I do.  It runs entirely in RAM but is persisted to disk.  My only complaint is that you get really excellent performance one minute, and horrendous performance the next.  There is no gradual degradation of performance.  This happens when your data set cannot fit fully into available RAM.  When I notice this happening I restart Redis, which effectively blows away my cache unfortunately, and everything hums again.  Sharding is your responsibility if you choose to scale-out.

Querying NoSQL with Hive and Pig

This is my next post in my evaluation of NoSQL solutions.  I want to cover Hive and Pig today, which are popular querying tools.    First, it is important to understand that with NoSQL solutions you need to have specific, predefined ways of analyzing and accessing your data.  Ad hoc querying and "advanced" query expressions that you would find in SQL (UNION, INTERSECT, etc) are generally not a very high priority in the big data world and are not implemented very well in the current solutions.  We'll get there some day but for now the space is evolving so quickly and the maturity just isn't there yet.  Heck, even SQL Server only began supporting INTERSECT as of 2005.  The current query tools, although lacking at the "advanced" features are very good at what they are designed to do, which is manage large data sets.  

 ...is built on Hadoop... ...is built on Hadoop......get it?  And Pig rides on top of Hadoop too..

During my evaluation I spent most of my time working with Hive because I found the ramp-up time and tooling much better than Pig.  Hive is very SQL-like and rides on top of Hadoop and is therefore geared to abstracting away the complexities of querying large, distributed data sets.  Under the covers Hive uses HDFS/Hadoop/MapReduce, but again, abstracting away the technical implementation of data retrieval, just like SQL does.  

But, if you remember from my "MapReduce for the RDBMS Guy" post, MapReduce works as a job scheduling system, coordinating activities across the data nodes.  This has a substantial affect on query response times.  Therefore, Hive is not really meant for real-time ad hoc querying.  There's lots of latency.  Lots.  Even small queries on dev-sized systems can be ORDERS of magnitude slower than a similar query on a RDBMS (but you really shouldn't compare the two this way).  

To further exacerbate the perceived performance problem, there is no query caching in Hive.  Repeat queries are re-submitted to MapReduce.  

So, if the performance is so bad, why does everyone expound on the virtues of these solutions?  As data sets get bigger, the overhead of Hive is dwarfed by the scale-out efficiencies of Hadoop.  Think of this as the equivalent of table scans in SQL Server.  Generally we all hate table scans and instead try to find a way to do index seeks.  But eventually we all hit a query tuning moment when we realize that a table scan, sometimes, is really better than billions of seeks.  Remember that Hive is optimize for BigData and batch processing, so touching every row is optimal.  


The HiveQL syntax is exactly like ANSI SQL.  Here's an example:

HiveQL even supports joins and can generate EXPLAIN plans (query plans).  
hhmmm...seems like NoSQL isn't so not-SQL after all.  

Prunes Analysis vs Fletcher's Castoria

Did you ever have somebody ask you:  

  • At what fragmentation level should I REORG vs REBUILD?
  • How many indexes should I have on my table?  
  • How often should I update my statistics?
  • Should I change my oil every 3000 miles or 5000?
  • What is the correct federal funds rate to ensure full employment and low inflation?  

I call these "Prunes Questions".  And I call the process of arriving at a satisfactory answer to these questions "Prunes Analysis".  I named this process after the famous Fletcher's Castoria commercial from the early '60's.  

Just like the commercial says...The problem with prunes is...if I'm constipated, is 3 enough?  Is 6 too many?  

This question haunted housewives and moms for years until the advent of Fletcher's Castoria in the mid 1800's.  Generally, the best laxative at the time was a good dose of prunes.  But how many do you eat?  If you don't eat enough, you're still constipated.  If you eat too many you'll have diarrhea.  

I know, lovely thought.  Fletcher's was supposed to eliminate this enigma.  You just give your kid the dose on the bottle and forget about it.  But of course, even with Castoria you might need to take a second and third dose.  

Where am I going with this?  People, especially IT people, like every question to have a firm (pun intended?) answer.  Given a choice between some prunes and a proper dose of Castoria, most people will take the Castoria every time.  People think of Castoria as the "known" and the prunes as the "unknown".  Known is viewed as being better.  

But not every problem has one "known" answer.  Sometimes "it depends" is a valid answer.  The questions I posited at the start of this post are examples of "prunes" questions.  Every person may have a slightly different answer and who is say who is right and who is wrong?  The answers are often difficult to prove.  In every case the "it depends" is referring to all of the variables that cannot be held constant.  

But most people hate being told "it depends" as the answer to their query.  They view the tergiversator (ambiguous respondent) as lacking knowledge or being standoffish.  But that's not always the case.  Answering "it depends" can be especially perilous if you are a consultant.  Clients like to know that their consultants have all of the answers.  

So, unless I'm trying to be purposefully equivocating, my stock answer to these types of questions is, "Is 3 enough? Is 6 too many?".   This answer makes the questioner stop and think.  I'll then give my answer/opinion, given the knowns, without having to say "it depends."  It's usually good for a chuckle too, especially if the questioner is a bit older.  

MongoDB and CouchDB

This my next post in my evaluation of NoSQL solutions.  MongoDB is a document store that can persist arbitrary collections of data as long as it can be represented using a JSON-like object hierarchy.  Mongo comes from the word "humongous", clearly indicating its intended purpose is for BigData.

Parallel Data Warehouse as a NoSQL Alternative

Another post in my NoSQL series...this one on Microsoft's Parallel Data Warehouse...PDW for short.  This is an installed appliance delivered right to your door with everything completely setup for you.  

MapReduce for the RDBMS Guy

MapReduce is a parallel programming framework for executing jobs against multiple data sets.  MapReduce is the framework Google uses to query multiple nodes in a partition-tolerant datastore.  Google owns the patent on the MapReduce framework, but the concepts are freely shared and there are a number of open-source implementations.  

MapReduce's concepts come from the functional programming world.  A map function applies an operation to each element in a list, generally returning another list.  It takes a key-value pair and emits another key-value pair.  Example...function "plus-one" applied to list [0,1,2,3,4] would yield a new list of [1,2,3,4,5].  The original list's data is immutable (never changed) and therefore I need not be concerned with concurrency and locking.  So, multiple map threads could work on this list if the list could be partitioned across multiple nodes.  The map function runs on each subset of a collection local to a distributed node.  The map operation on any one node is completely independent on the same map operation running on another node.  This isolation is how you get cheap, effective parallel processing.  And if any node fails the map function can be restarted on a redundant node.  

Functional programming also has the concept of a reduce function, more commonly known as a "fold function", but you sometimes hear them called "accumulate" or "inject" functions.  A fold function applies a calculation on the key-value pairs and produces a single result.  Almost like the SUM or AVG aggregate functions in RDMBS-land.  A fold (or reduce) function on the list [1,2,3,4,5] would yield a result of 15.  

Map and Reduce functions are used together to process lists of data.  The maps may run a simple calculation on many nodes of the datastore.  The result may be piped to the reducer that may run on a single master node that generates the query's answer.  The map generally must run before the reduce, but the underlying code generally runs computations in parallel.  

Maps and reduces can be thought of as algorithms.  The maps are the relationships and hierarchies among the data.  It is critical to performance that the maps are designed properly.  A map will generally "scan" the data, much like an RDBMS index scan.  Maps will generally not seek.  

A map may pass its data to another map, there can be more than one map function prior to the reduce being called.  Similarly, a reduce function can be the target of another reducer.  

When your querying requirement gets more complex it is generally not a good idea to make you map and reduce jobs more complex.  Instead you should have additional mappers and reducers.  So, add more jobs vs making the existing jobs more complex.  

A Better Example

Think of a partitioned datastore of Customers on 64 nodes.  We want to know how many customers reside in Pennsylvania.  A Java Jock working on a traditional RDBMS may do something stupid like bring back all customers from each node/database and then look for the "Pennsylvanias" on the client tier.  That's wildly inefficient.  With MapReduce the requester will issue a "map" to each node that says, "Please give me a count of your node's customer in PA".  Each node sends the data back to the requester which "reduces" the responses by saying, "Now give me a sum of all nodes' counts."  That's actually kinda efficient.  

Is MapReduce that much different than what we do in RDBMSs today?

In an RDBMS the paradigm is commonly "pass the data to the algorithm" (ie, send the customers table to the jvm to be filtered there for the customers from PA).  But in MapReduce the algorithm is reversed..."pass the algorithm to the data".  Yeah, yeah, this is really flimsy, but this is how most people think about it.  I tend to think of the map as the equivalent of a stored procedure in the RDBMS.  And if you remember that we are running the MapReduce on many, many nodes, then it rings a little more true.  
I think of reduce functions as something akin to an aggregation operation.  Aggregation reduces the number of rows in an RDBMS and a reducer does the same thing.  It reduces the amount of data travelling to the next step.  
Some MapReduce implementations have a "link" function as well which is equivalent to a JOIN condition in an RDBMS. 
Microsoft Daytona
MS has a map/reduce framework for Azure called Daytona.  

Windows Azure Table Service

Another post in my NoSQL series.  Microsoft offers NoSQL-like data persistence engines outside of SQL Server.  The most compelling is Windows Azure Table Service.  This is a simple persistence mechanism for unstructured data that you can certainly use like a key/value store.  A single blob can be up to 1TB in size and each item in the key/value store can be 1MB.  Just like most key/value stores, no schema is needed.  
The service is organized as a series of tables (not at all the same as relational tables), and each table contains one or more entities. Tables can be divided into partitions and each entity has a two-part key that specifies the partition (the partition key) and the entity ID (the row key).  Entities stored in the same partition have the same value for the partition element of this key, but each entity must have a unique row key within a partition. The Table service is optimized for performing range queries, and entities in the same partition are stored in row key order. The data for an entity comprises a set of key/value pairs known as properties. Like other NoSQL databases, the Table service is schema-less, so entities in the same table can have different sets of properties.  
Conceptually this seems to be almost identical to MongoDb.  I'll write about my experiences with MongoDb in a future post


This is the next post in my NoSQL series.   As I was starting my first NoSQL POC (using SAP HANA) SQL Server announced Hekaton in one of their CTPs for SQL Server 2014.  I was intrigued because it appeared as though Hekaton, an in-memory optimization feature, was, basically, HANA.  It turns out HANA is much more.  This post has been at least 9 months in the making and since then everyone in the blogosphere has evaluated and posted about Hekaton so I'll keep this post really short.  

Hekaton is Greek for a hundred, and that was the targeted performance improvement that Microsoft set out to achieve when building this new technology.  Oracle has an in-memory product called TimesTen and I wonder if the Hekaton name was a bit of one-upmanship.  

Initially I was skeptical that Hekaton was little more than DBCC PINTABLE.  It's more...a lot more.  

In case you've never heard of Hekaton, there are two main features:

  • a table can be declared as in-memory (similar to SAP HANA)
  • stored procedures can be compiled into native DLLs if they only touch in-memory tables.  

Most database managers in existence today (except the "newer" varieties like HANA) were built on the assumption that data lives on rotational media and only small chunks of data will be loaded into memory at any given time. Therefore there is a lot of emphasis on IO operations and latching within the database engine. For instance, when looking up data in SQL Server we traverse a B-Tree structure, which is a rotational media-optimized structure.  Hekaton does not use B-Trees, instead it uses memory pointers to get to the data.  This is orders of magnitude faster.  

Hekaton transactions are run in the equivalent of snapshot isolation level.  New versions of changed data are stored in RAM with a new pointer.  Transactions still get logged to the tran log and data is still persisted to disk, but the disk-based table data for Hekaton tables is only read from disk when the database starts up.  And it is not stored in B-Trees.  Instead, the versioned memory pointers are persisted to disk and on database startup those pointers/versions are re-read into memory.  

You will see errors just like with other in memory-based database managers if the data grows too large to fit into RAM.  The system does not fall back to traditional disk-based B-Trees.
Other Interesting Features
  • use SCHEMA_ONLY when creating table and it is non-durable and non-logged.  The data will be gone when the instance restarts (or fails over), but the schema remains.  This is good for ETL and session state information.  
  • If indexes on these tables are not B-trees then what are they?  Hash indexes...therefore all memory-optimized tables must have an index. Indexes are rebuilt on instance restart as the data is streamed to memory.  Indexes are not persisted to disk and are not part of your backup.  
  • No locks are acquired and there are no blocking waits.  In-memory tables use completely optimistic multi-version concurrency control.  
Implementation Features
  • the database must have a filegroup that CONTAINS MEMORY_OPTIMIZED_DATA that is used to recover the data.  This makes sense since legacy filegroups are B-Tree-organized.  
  • the tables (or database) must use a Windows BIN2 collation.  
  • tables can have no blobs or XML datatypes, no DML triggers, no FK/check constraints, no Identity cols, no unique indexes other than PK.  
  • maximum 8 indexes. 
  • There are no schema changes to the table once it is created.  That includes indexes.  

There is lots of good information on Hekaton on the internet.  Far more than I can put into a blog post.  This is an interesting development.  

Graph Datastores

This is the next post on my series on NoSQL solutions.  I haven't actually worked with any graph databases in depth because I haven't been presented with any data problems that would warrant the use of a graph data store.  

What are the "right" use cases for a graph database?

  • Mapping applications.  "Find me the shortest path between any two nodes in a connected graph."  This would be difficult to solve algorithmically using a RDBMS.  In something like Google Maps the "shortest path" isn't necessarily the ultimate goal.  You may want the "cheapest" path instead (no toll roads or only 4 lane highways).  
  • Recommendation generation systems.  If you purchase x online, what have others purchased as well and we can display that for upselling purposes.  
  • Social networking like LinkedIn.  How does a person relate to another person?  This is another example of a graph.  
  • Data mining to find the interactions among nodes.  

Neo4j is the graph data store I've played with.  It uses untyped datatypes and the data is stored much like it would be in a document db...in an ad hoc fasion.  Relationships are declared based on the Edges between nodes.  


Subscribe to RSS - Data Architecture