newsletter

Ricevi gli ultimi aggiornamenti da Hortonworks tramite e-mail

Una volta al mese, ricevi gli approfondimenti, le tendenze, le informazioni analitiche e la conoscenza approfondita dei big data.

AVAILABLE NEWSLETTERS:

Sign up for the Developers Newsletter

Una volta al mese, ricevi gli approfondimenti, le tendenze, le informazioni analitiche e la conoscenza approfondita dei big data.

invito all'azione

Per iniziare

cloud

Sei pronto per cominciare?

Scarica Sandbox

Come possiamo aiutarti?

* Ho compreso che posso disdire in qualsiasi momento. Sono inoltre a conoscenza delle informazioni aggiuntive presenti nella informativa sulla privacy di Hortonworks.
chiudiPulsante di chiusura
December 19, 2018
diapositiva precedentediapositiva successiva

Introducing Hive-Kafka integration for real-time Kafka SQL queries

Our last few blogs as part of the Kafka Analytics blog series focused on the addition of Kafka Streams to HDP and HDF and how to build, secure, monitor Kafka Streams apps / microservices. In this blog, we focus on the SQL access pattern for Kafka with the new Kafka Hive Integration work.

Kafka SQL – What our Kafka customers have been asking for

Stream processing engines/libraries like Kafka Streams provide a programmatic stream processing access pattern to Kafka. Application developers love this access pattern but when you talk to BI developers, their analytics requirements are quite different which are focused on use cases around ad hoc analytics, data exploration and trend discovery. BI persona requirements for Kafka access include:

  • Treat Kafka topics/streams as tables
  • Support for ANSI SQL
  • Support complex joins (different join keys, multi-way join, join predicate to non-table keys, non-equi joins, multiple joins in the same query)
  • UDF support for extensibility
  • JDBC/ODBC support
  • Creating views for column masking
  • Rich ACL support including column level security

To address these requirements, the new HDP  3.1 release has added a new Hive Storage Handler for Kafka which allows users to view Kafka topics as Hive tables. This new feature allows BI developers to take full advantage of Hive analytical operations/capabilities including complex joins, aggregations, window functions, UDFs, pushdown predicate filtering, windowing, etc.

Kafka Hive C-A-T (Connect, Analyze, Transform)

The goal of the Hive-Kafka integration is to enable users the ability to connect, analyze and transform data in Kafka via SQL quickly.

  • Connect: Users will be able to create an external table that maps to a Kafka topic without actually copying or materializing the data to HDFS or any other persistent storage. Using this table, users will be able to run any SQL statement with out of the box authentication and authorization support using Ranger.
  • Analyze: Leverage Kafka time travel capabilities and offset based seeks in order to minimize I/O. Having such capabilities will enable both ad hoc queries across time slices in the stream and will allow exactly once offloading by controlling the position in the stream.
  • Transform: Users will be able to masque, join, aggregate and change the serialization encoding of the original stream and create a stream persisted in a Kafka topic. Joins can be against any dimension table or any stream. User will be able to offload the data from Kafka to Hive warehouse (eg HDFS, S3 …etc).

 

The Connect of Kafka Hive C-A-T

To connect to a Kafka topic, execute a DDL to create an external Hive table representing a live view of the Kafka stream. The external table definition is handled by a storage handler implementation called ‘KafkaStorageHandler’. The storage handler relies on 2 mandatory table properties to map the Kafka topic name and the Kafka broker connection string. The below shows a sample DDL statement (more examples can be found here):

Recall that records in Kafka are stored as a key-value pairs, therefore the user needs to supply the serialization/deserialization classes to transform the value byte array into a set of columns. Serialization/Deserialization are supplied using the table property “kafka.serde.class”. As of today the default is JsonSerDe and there is out of the box serdes for formats such as csv, avro, and others.

In addition to the schema columns defined in the DDL, the storage handler captures metadata columns for the Kafka topic including partition, timestamp and offset. The metadata columns allows Hive to optimize queries for “time travel”, partition pruning and offset based seeks.

Time-Travel, Partition Pruning and Offset Based Seeks: Optimizations for Fast SQL in Kafka

From Kafka release 0.10.1 onwards, every Kafka message has a timestamp associated with it. The semantics of this timestamp is configurable (e.g: value assigned by producer, when leader receives the message, when a consumer receives the message, etc.). Hive adds this timestamp field as a column to the Kafka Hive table.  With this column, users can use filter predicates  to time travel (e.g: read only records past a given point in time). This is achieved using the Kafka consumer public API OffsetsForTime that returns the offset for each partitions earliest offset whose timestamp is greater than or equal to the given timestamp. Hive parses the filter expression tree and looks for any predicate in the the following form to provide time based offset optimizations: __timestamp [>= , >, =] constant_int64. This will allow optimization for queries like:

Customers leverage this powerful optimization by creating time based views of the streaming data into Kafka like the following (e.g: a view over the last 15 minutes of the streaming data into Kafka topic kafka_truck_geo_events). The implementation of this optimization can be found here: KafkaScanTrimmer#buildScanForTimesPredicate 

Partition pruning is another powerful optimization. Each Kafka Hive table is partitioned based on the __partition metadata column. Any filter predicate over column __partition can be used to eliminate the unused partitions. See optimization implementation here: KafkaScanTrimmer#buildScanFromPartitionPredicate

Kafka Hive also takes advantages of offset based seeks which allows users to seek to specific offset in the stream. Thus any predicate that can be used as a start point eg __offset > constant_64int can be used to seek in the stream. Supported operators are =, >, >=, <, <=. 

See optimization implementation here: KafkaScanTrimmer#buildScanFromOffsetPredicate

Implementing Discovery/Trend Analytics Requirements with Kafka Hive Integration

Let’s bring this full circle with a concrete example of using Kafka Hive SQL to implement a series of requirements on streaming data into Kafka.

The below video showcases how to implement each of these requirements.

Whats Next?

In the next installment of the Kafka Analytics Blog series, we will cover the Kafka Druid Integration for fast OLAP style analytics on streaming data into Kafka.

 

 

Comments

buntu says:

Are there any specific versions of Hive and Kafka the storage handler will work with? Thanks!

Slim Bouguerra says:

This integration is part of HDP 3.1 https://hortonworks.com/blog/tag/hdp-3-1/.
If you want to build it your self you can do it using https://github.com/apache/hive/tree/master or https://github.com/t3rmin4t0r/hive-kafka-handler. I was able to run it against Hive 2.X.
Let me know if you have more questions.

Frank Kemmer says:
Your comment is awaiting moderation.

We are using CDH 5.14 (we could migrate to 6.2.0 if necessary). Is it possible to run the Kafka Storage Handler under Cloudera’s Hive, i.e.

* Hive 2.1.1 in CDH 6.2.0
* Hive 1.1.0 in CDH 5.14

Any chances?

I looked into

* https://github.com/t3rmin4t0r/hive-kafka-handler

but it looks like it is configured for Hive 3.1.0

Thanks for any help in advance.

kafka-user says:

Interesting. Is there an architecture document that describes how the queries are executed in hive in a distributed cluster,? Interested to know how the aggregations work, the intermediate state storage, the reliability guarantees and so on.

Slim bouguerra says:

Dear Kafka user.
Yes there is a document detailing this can be found here https://docs.google.com/document/d/1UcXq-rrrc6cBR4MEDLOwazUhGphniJErhrwgrLDa0_I
In a nutshell each Kafka Partition will be mapped to a Hive Input split (eg a Map Task per say), form this point the regular Hive operator execution takes over it is like any other Hive table.
When querying Kafka there is no intermediate state to store.
Let me know if you have more questions.

Guilherme Dio says:

How Hive-Kafka differs from Confluent`s KSQL(Kafka SQL) ?

Slim Bouguerra says:

This integration is very different from KSQL.
– The primary use case here is to allow users to actually unleash full SQL query use cases against any Kafka topic. https://github.com/apache/hive/tree/master/kafka-handler#query-table
– You can use it to atomically move data in and out Kafka it self. https://github.com/apache/hive/tree/master/kafka-handler#query-table
– Query the Kafka Stream as part of the entire Data warehouse like ORC/Parquet tables, Druid Tables, HDFS, S3… etc.
– Many other points that can be added here but will keep it for another blog post.

Shubham Aggarwal says:

How is this different from what PrestoDB offers in its Kafka connector? Refer here: https://prestodb.io/docs/current/connector/kafka-tutorial.html as:
• ANSI SQL is supported
• Real time data queries

Slim Bouguerra says:

What we are proposing here is far more advance see section time travel. As far i can tell the Presto adapter does not push down any predicate on time in conjunction with predicate on offsets and partition ids, thus with presto you will be doing always a full stream scan.
Current state of Hive-Kafka adapter seeks to the actual offset that satisfy the filter predicate let say last 10 mins worth of data.

Slim Bouguerra says:

As you can see here the presto adapter doesn’t even expose the Kafka Native Timestamp column thus you can not do any time based travel.
https://prestodb.github.io/docs/current/connector/kafka.html#internal-columns

Ladislav Jech says:
Your comment is awaiting moderation.

So I have in my Kafka following complex schema:
root object
attribute a
attribute b
nested object
attribute c
attribute d

So I create 2 tables, or I create one super large table with a,b,c,d and Hive can handle this?

Lascia una risposta

L'indirizzo email non verrà reso pubblico. I campi obbligatori sono segnalati con *