Processing big data with modern applications: Hadoop as DWH backend at Pro7 Dr. Kathrin Spreyer Big data engineer GridKa School Karlsruhe, 02.09.2014 Outline 1. Relational DWH 2. Data integration with PDI 3. Behind the scenes: Hadoop 4. Ingestion with Flume 5. Hive for the business user + for Reporting 2 Architecture Lösungsansatz Hybrides System aus relationaler Datenbank und Hadoop Cluster 3 Architecture Lösungsansatz Hybrides System aus relationaler Datenbank und Hadoop Cluster 3 Architecture Lösungsansatz Hybrides System aus relationaler Datenbank und Hadoop Cluster 3 Relational DWH 4 Data sources 1. DWH integrating reach, marketing revenue und transaction data ProSiebenSat.1 Digital Wesentlicher Treiber der Digitalstrategie Online TV Channel ProSiebenSat.1 Network Externe Mandanten .de 5 Data sources 1. DWH integrating reach, marketing revenue und transaction data ProSiebenSat.1 Digital Wesentlicher Treiber der Digitalstrategie Online TV Channel ProSiebenSat.1 Network Externe Mandanten .de 6 Data sources Metrics: 1. MyVideo Apache Logs VV, VT, ads, player errors, .. 2. Webtrekk VV, VT, ads 3. DfP (Adserver) ads 4. Google Analytics VV, PI, visits, visitors 5. social networks posts, comments, Likes 6. Conviva bit rate, video startup time, .. 7. .... 7 Data model 1. relational data model (star schema) 8 Reporting 1. Excel at first 2. by now: Business Objects 3. recipients: BI, Controlling, Management, ... 9 Data integration 10 Data integration 1. ETL tool Pentaho Data Integration (PDI) 1. data acquisition 2. validation and cleansing 3. aggregation 4. integration in fact tables 11 Data integration 1. connectors to common data and storage formats 1. files: CSV, JSON, XML, ... 2. RDBMS: 40+ connection types 3. big data: HDFS, Cassandra, HBase, MongoDB, ... 12 Data integration 1. connectors to common data and storage formats 1. files: CSV, JSON, XML, ... 2. RDBMS: 40+ connection types 3. big data: HDFS, Cassandra, HBase, MongoDB, ... 12 Pentaho Data Integration 13 Pentaho Data Integration Pentaho MapReduce 13 Hadoop backend 14 Processing with MapReduce 1. aggregation of raw data 2. transparent parallelisation 3. horizontal scaling 15 Java: Mapper and Reducer public class WebtrekkEventMapper extends Mapper<Text, Text, Text, IntWritable> { ! ! ! ! ! ! ! ! } @Override protected void map( Text key, Text value, Context context ) throws IOException, InterruptedException { // key contains entire record String[] fields = key.toString().split( ";" ); // extract relevant information String eventname = fields[12]; // emit output key and count context.write( new Text( eventname ), ! new IntWritable( 1 )); public class IntSumReducer } extends Reducer<Text, IntWritable, Text, IntWritable> { ! @Override protected void reduce( Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { ! int sum = 0; ! for ( IntWritable partialCount : values ) { ! sum += partialCount.get(); ! } ! context.write( key, new IntWritable( sum ) ); } } 16 Java: Mapper and Reducer Mapper (+Driver) Reducer public class WebtrekkEventMapper extends Mapper<Text, Text, Text, IntWritable> { ! ! ! ! ! ! ! ! } @Override protected void map( Text key, Text value, Context context ) throws IOException, InterruptedException { // key contains entire record String[] fields = key.toString().split( ";" ); // extract relevant information String eventname = fields[12]; // emit output key and count context.write( new Text( eventname ), ! new IntWritable( 1 )); public class IntSumReducer } extends Reducer<Text, IntWritable, Text, IntWritable> { ! @Override protected void reduce( Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { ! int sum = 0; ! for ( IntWritable partialCount : values ) { ! sum += partialCount.get(); ! } ! context.write( key, new IntWritable( sum ) ); } } 16 PDI: Driver, Mapper and Reducer Driver Mapper Reducer 17 Pentaho MapReduce 1. supports all regular PDI steps 2. (almost) no Java expertise required 3. limited optimization options 18 Java MapReduce 1. native Hadoop API 2. supports all Hadoop features 3. more control, optimization options 19 Data ingestion 20 Ingestion with Flume 1. import via FTP: up to 24 hour delay until data is accessible in DWH 2. Apache Flume: continuous data stream 3. can deal with late arrivals, too (sorts events into directories according to time stamp) 21 Data volume 1. via Flume: 20 TB per year (before replication) 2. via FTP: 21.5 TB per year (b.r.) 3. archival: 500 GB per year (b.r.) 4. total: 40 TB (120 TB) per year (as of late 2013) 22 Cluster size 1. dev: 4 DN + NN + SN + DB + admin 2. prod: 6 DN + NN + SN + DB + 2 admin 3. HDFS capacity: 185 TB (after replication) 4. scalable (as of late 2013) 23 Cluster size 1. dev: 4 DN + NN + SN + DB + admin 2. prod: 6 DN + NN + SN + DB + 2 admin 3. HDFS capacity: 185 TB (after replication) 4. scalable (as of late 2013) 23 Hive for the business user 24 Apache Hive 1. familiar interface: SQL 2. exploration of new metrics 3. analysis of data anomalies 4. integration with reporting framework (BO) 25 Example query SELECT mandant, day, count(*) FROM webtrekk_cust_para_click_2 WHERE mandant = 'sat1' AND day BETWEEN '2013-01-01' AND '2013-01-31' GROUP BY sid, request_id, times, day, mandant HAVING count(*) >1 26 Hive access to raw data Lösungsansatz Hybrides System aus relationaler Datenbank und Hadoop Cluster 27 Hive access to raw data Lösungsansatz Hybrides System aus relationaler Datenbank und Hadoop Cluster 27 Hive access to raw data Lösungsansatz Hybrides System aus relationaler Datenbank und Hadoop Cluster 27 Summary Lösungsansatz Hybrides System aus relationaler Datenbank und Hadoop Cluster 28 Questions? 29