Wednesday, 22 June 2016

Avro2Orc

Avro Schema
{
  "namespace" : "schema1.avro",
  "type"     :  "record",
  "name"     :  "BorrowerCalValues_Record",
  "fields"   :  [
       {"name": "SellerLoanIdentifier", "type": ["string", "null"]},
       {"name": "SellerPartyRoleIdentifier", "type": ["string", "null"]},
       {"name": "UniqueClientDealIdentifier", "type": ["string", "null"]},
       {"name": "BorrowerClassificationType", "type": ["string", "null"]},
       {"name": "BorrowerIdentifier", "type": ["string", "null"]},
       {"name": "BorrowerTaxpayerIdentifier", "type":["string", "null"]},
       {"name": "BorrowerCalculatedValueName", "type":["string", "null"]},
       {"name": "BorrowerCalculatedValue", "type": ["string", "null"]}
   ]
}
Compiling Avro Schema & Code generation
java -jar C:\Users\user_id\.m2\repository\org\apache\avro\avro-tools\1.7.7\avro-tools-1.7.7.jar 
compile schema BorrowerCalculatedValues.avsc .

pom dependencies
  <dependency>
   <groupId>org.apache.crunch</groupId>
   <artifactId>crunch-core</artifactId>
   <version>0.14.0</version>
  </dependency>
  <dependency>
   <groupId>org.apache.crunch</groupId>
   <artifactId>crunch-hive</artifactId>
   <version>0.14.0</version>
  </dependency>
  <dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-core</artifactId>
   <version>1.2.1</version>
  </dependency>
  <dependency>
   <groupId>org.apache.hive</groupId>
   <artifactId>hive-exec</artifactId>
   <version>1.2.1</version>
  </dependency>

Generate Avro Data

package CreateOrcFile;

import java.io.File;
import java.io.IOException;

import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumWriter;

import CreateOrcFile.BorrowerCalValues_Record;

public class GenerateAvroData {
 public static void main(String[] args) throws IOException {
  BorrowerCalValues_Record rec1 = new BorrowerCalValues_Record();
  rec1.setBorrowerCalculatedValue("111");
  rec1.setBorrowerCalculatedValueName("rec1");
  rec1.setBorrowerClassificationType("222");
  rec1.setBorrowerIdentifier("333");
  rec1.setBorrowerTaxpayerIdentifier("333");
  rec1.setSellerLoanIdentifier("444");
  

  // Alternate constructor
  BorrowerCalValues_Record rec2 = new BorrowerCalValues_Record("222","rec2","333","444","555","666","777","888");


  // Serialize rec1 and rec2 to disk
  File file = new File("BorrowerCalValues.avro");
  DatumWriter userDatumWriter = new SpecificDatumWriter(
    BorrowerCalValues_Record.class);
  DataFileWriter dataFileWriter = new DataFileWriter(
    userDatumWriter);

  dataFileWriter.create(rec1.getSchema(), file);

  dataFileWriter.append(rec1);
  dataFileWriter.append(rec2);
  dataFileWriter.close();
 }
}

Deserialize Avro Data
package CreateOrcFile;

import java.io.File;
import java.io.IOException;

import org.apache.avro.file.DataFileReader;
import org.apache.avro.io.DatumReader;
import org.apache.avro.specific.SpecificDatumReader;

import CreateOrcFile.BorrowerCalValues_Record;

public class DeserializeAvro {
 public static void main(String[] args) throws IOException {
  File file = new File("BorrowerCalValues.avro");  
  DatumReader userDatumReader = new SpecificDatumReader(BorrowerCalValues_Record.class);
  DataFileReader dataFileReader = new DataFileReader(file, userDatumReader);
  BorrowerCalValues_Record emp = null;
  while (dataFileReader.hasNext()) 
  {
   emp = dataFileReader.next();   
   System.out.println(emp);
  }
 }
}
Avro2Orc
package CreateOrcFile;

import org.apache.avro.file.DataFileReader;
import org.apache.avro.io.DatumReader;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.crunch.types.orc.OrcUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.hive.serde2.io.ShortWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hive.ql.io.orc.Writer;

import java.io.File;
import java.io.IOException;

public class App {

 
 public static void main(String[] args) throws IOException {
  App test = new App();
  File file = new File("BorrowerCalValues.avro");  
  DatumReader userDatumReader = new SpecificDatumReader(BorrowerCalValues_Record.class);
  DataFileReader dataFileReader = new DataFileReader(file, userDatumReader);
  BorrowerCalValues_Record rec = null;
  Configuration conf = new Configuration();
  
  Path tempPath = new Path("c:/temp/test3.orc");
  String typeStr = "struct";
  TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeStr);
  ObjectInspector inspector = OrcStruct.createObjectInspector(typeInfo);
  
  Writer writer = OrcFile.createWriter(tempPath, OrcFile.writerOptions(conf).inspector(inspector).stripeSize(100000).bufferSize(10000));
  
  
  
  while (dataFileReader.hasNext()) 
  {
   rec = dataFileReader.next();   
   System.out.println(rec);
   OrcStruct orcLine = test.getOrcLine(rec,typeInfo);
   writer.addRow(orcLine);
  }
  writer.close();
  
 }

 public OrcStruct getOrcLine(BorrowerCalValues_Record input,TypeInfo typeInfo_) throws IOException {


  OrcStruct orcLine = OrcUtils.createOrcStruct(
    typeInfo_,
    new Text(input.getBorrowerCalculatedValue().toString()),
    new Text(input.getBorrowerCalculatedValueName().toString()),
    new Text(input.getBorrowerClassificationType().toString()),
    new Text(input.getBorrowerIdentifier().toString()),
    new Text(input.getBorrowerTaxpayerIdentifier().toString()),
    new Text(input.getSellerLoanIdentifier().toString())
    );
  
  return orcLine;
  
 }
}

Load ORC file into hive table
CREATE TABLE test_orc_file(
BorrowerCalculatedValue VARCHAR(255),
BorrowerCalculatedValueName VARCHAR(255),
BorrowerClassificationType VARCHAR(255),
BorrowerIdentifier VARCHAR(255),
BorrowerTaxpayerIdentifier VARCHAR(255),
SellerLoanIdentifier VARCHAR(255)
) STORED AS ORC; 


LOAD DATA LOCAL INPATH '/tmp/test3.orc' INTO TABLE test_orc_file;

HBase Summary



Data Modeling Overview



HBase is different from RDBMS in the sense that it has cells and column families..




Unlike in RDBMS, HBase has row, column family, column and timestamp 
in there.


One dimension you don't see in below picture is the time stamp associated 
to value in the cell.








Customer regional server has multiple column families and data is stored in HFile.






Best Practices..













If we don't have hotspotting, there will be nice distribution of data across the cluster.








Note that row key is repeated with every column and cell. It occupies significant amount of
space.

































































































































Securing HBase


Server side configuration:




Client side configuration:





MapReduce Integration with HBase

bin/hbase mapredcp  command returns the class-path for mapreduce dependencies








HBase is atomic and consistent (not eventual consistence..)





























































Ideal for local testing.




















Installing HBase in Local Mode








Set hbase.rootdir and hbase.zookeeper.property.datadir in conf/hbase-site.xml to write 
data other than /tmp.





bin/start-hbase.sh command can be used to start HBase..



bin/stop-hbase.sh command can be used to stop HBase..







HBase cluster can have up to 9 back up masters.






HBase Web-Based Management Console













Using the HBase shell

Make sure HBase is running before starting the shell. 
bin/hbase shell command can be used to start the shell.















Using the HBase as a Data Sink for MapReduce Jobs


TableMapReduceUtil is HBase specific util class that will setup configuration needed for
HBase.


Using the HBase as a Data Source for MapReduce Jobs

TableMapReduceUtil.initTbaleMapperJob takes name of HBase table used for mapper, scan (may contain
filters), mapper class, key (ImmutableBytesWritable.class) and values (IntWritable.class).



Bulk Loading Data





Splitting Map Tasks when Sourcing an HBase Table



Accessing Other HBase Tables within a MapReduce Job



Taking a Snapshot