Wednesday, 22 June 2016

Avro2Orc

Avro Schema
{
  "namespace" : "schema1.avro",
  "type"     :  "record",
  "name"     :  "BorrowerCalValues_Record",
  "fields"   :  [
       {"name": "SellerLoanIdentifier", "type": ["string", "null"]},
       {"name": "SellerPartyRoleIdentifier", "type": ["string", "null"]},
       {"name": "UniqueClientDealIdentifier", "type": ["string", "null"]},
       {"name": "BorrowerClassificationType", "type": ["string", "null"]},
       {"name": "BorrowerIdentifier", "type": ["string", "null"]},
       {"name": "BorrowerTaxpayerIdentifier", "type":["string", "null"]},
       {"name": "BorrowerCalculatedValueName", "type":["string", "null"]},
       {"name": "BorrowerCalculatedValue", "type": ["string", "null"]}
   ]
}
Compiling Avro Schema & Code generation
java -jar C:\Users\user_id\.m2\repository\org\apache\avro\avro-tools\1.7.7\avro-tools-1.7.7.jar 
compile schema BorrowerCalculatedValues.avsc .

pom dependencies
  <dependency>
   <groupId>org.apache.crunch</groupId>
   <artifactId>crunch-core</artifactId>
   <version>0.14.0</version>
  </dependency>
  <dependency>
   <groupId>org.apache.crunch</groupId>
   <artifactId>crunch-hive</artifactId>
   <version>0.14.0</version>
  </dependency>
  <dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-core</artifactId>
   <version>1.2.1</version>
  </dependency>
  <dependency>
   <groupId>org.apache.hive</groupId>
   <artifactId>hive-exec</artifactId>
   <version>1.2.1</version>
  </dependency>

Generate Avro Data

package CreateOrcFile;

import java.io.File;
import java.io.IOException;

import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumWriter;

import CreateOrcFile.BorrowerCalValues_Record;

public class GenerateAvroData {
 public static void main(String[] args) throws IOException {
  BorrowerCalValues_Record rec1 = new BorrowerCalValues_Record();
  rec1.setBorrowerCalculatedValue("111");
  rec1.setBorrowerCalculatedValueName("rec1");
  rec1.setBorrowerClassificationType("222");
  rec1.setBorrowerIdentifier("333");
  rec1.setBorrowerTaxpayerIdentifier("333");
  rec1.setSellerLoanIdentifier("444");
  

  // Alternate constructor
  BorrowerCalValues_Record rec2 = new BorrowerCalValues_Record("222","rec2","333","444","555","666","777","888");


  // Serialize rec1 and rec2 to disk
  File file = new File("BorrowerCalValues.avro");
  DatumWriter userDatumWriter = new SpecificDatumWriter(
    BorrowerCalValues_Record.class);
  DataFileWriter dataFileWriter = new DataFileWriter(
    userDatumWriter);

  dataFileWriter.create(rec1.getSchema(), file);

  dataFileWriter.append(rec1);
  dataFileWriter.append(rec2);
  dataFileWriter.close();
 }
}

Deserialize Avro Data
package CreateOrcFile;

import java.io.File;
import java.io.IOException;

import org.apache.avro.file.DataFileReader;
import org.apache.avro.io.DatumReader;
import org.apache.avro.specific.SpecificDatumReader;

import CreateOrcFile.BorrowerCalValues_Record;

public class DeserializeAvro {
 public static void main(String[] args) throws IOException {
  File file = new File("BorrowerCalValues.avro");  
  DatumReader userDatumReader = new SpecificDatumReader(BorrowerCalValues_Record.class);
  DataFileReader dataFileReader = new DataFileReader(file, userDatumReader);
  BorrowerCalValues_Record emp = null;
  while (dataFileReader.hasNext()) 
  {
   emp = dataFileReader.next();   
   System.out.println(emp);
  }
 }
}
Avro2Orc
package CreateOrcFile;

import org.apache.avro.file.DataFileReader;
import org.apache.avro.io.DatumReader;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.crunch.types.orc.OrcUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.hive.serde2.io.ShortWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hive.ql.io.orc.Writer;

import java.io.File;
import java.io.IOException;

public class App {

 
 public static void main(String[] args) throws IOException {
  App test = new App();
  File file = new File("BorrowerCalValues.avro");  
  DatumReader userDatumReader = new SpecificDatumReader(BorrowerCalValues_Record.class);
  DataFileReader dataFileReader = new DataFileReader(file, userDatumReader);
  BorrowerCalValues_Record rec = null;
  Configuration conf = new Configuration();
  
  Path tempPath = new Path("c:/temp/test3.orc");
  String typeStr = "struct";
  TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeStr);
  ObjectInspector inspector = OrcStruct.createObjectInspector(typeInfo);
  
  Writer writer = OrcFile.createWriter(tempPath, OrcFile.writerOptions(conf).inspector(inspector).stripeSize(100000).bufferSize(10000));
  
  
  
  while (dataFileReader.hasNext()) 
  {
   rec = dataFileReader.next();   
   System.out.println(rec);
   OrcStruct orcLine = test.getOrcLine(rec,typeInfo);
   writer.addRow(orcLine);
  }
  writer.close();
  
 }

 public OrcStruct getOrcLine(BorrowerCalValues_Record input,TypeInfo typeInfo_) throws IOException {


  OrcStruct orcLine = OrcUtils.createOrcStruct(
    typeInfo_,
    new Text(input.getBorrowerCalculatedValue().toString()),
    new Text(input.getBorrowerCalculatedValueName().toString()),
    new Text(input.getBorrowerClassificationType().toString()),
    new Text(input.getBorrowerIdentifier().toString()),
    new Text(input.getBorrowerTaxpayerIdentifier().toString()),
    new Text(input.getSellerLoanIdentifier().toString())
    );
  
  return orcLine;
  
 }
}

Load ORC file into hive table
CREATE TABLE test_orc_file(
BorrowerCalculatedValue VARCHAR(255),
BorrowerCalculatedValueName VARCHAR(255),
BorrowerClassificationType VARCHAR(255),
BorrowerIdentifier VARCHAR(255),
BorrowerTaxpayerIdentifier VARCHAR(255),
SellerLoanIdentifier VARCHAR(255)
) STORED AS ORC; 


LOAD DATA LOCAL INPATH '/tmp/test3.orc' INTO TABLE test_orc_file;

No comments:

Post a Comment