Splice Machine Map Reduce API

The Splice Machine MapReduce API provides a simple programming interface to the Map Reduce Framework that is integrated into Splice Machine. You can use MapReduce to import data, export data, or for purposes such as implementing machine learning algorithms. One likely scenario for using the Splice Machine MapReduce API is for customers who already have a Hadoop cluster, want to use Splice Machine as their transactional database, and need to continue using their batch MapReduce jobs.

   Learn more

This topic includes a summary of the Java classes included in the API, and presents an example of using the MapReduce API.

Splice Machine MapReduce API Classes

The Splice Machine MapReduce API includes the following key classes:

Class Description
SpliceJob Creates a transaction for the MapReduce job.
SMInputFormat

Creates an object that:

  • uses Splice Machine to scan the table and decode the data
  • returns an ExecRow (typed data) object
SMOutputFormat

Creates an object that:

  • writes to a buffered cache
  • dumps the cache into Splice Machine
  • returns an ExecRow (typed data) object.
SpliceMapReduceUtil A Helper class for writing MapReduce jobs in java; this class is used to initiate a mapper job or a reducer job, to set the number of reducers, and to add dependency jars.

Each transaction must manage its own commit and rollback operations.

For information about and examples of using Splice Machine with HCatalog, see the <a href=”Using Splice Machine with HCatalog topic.

Example of Using the Splice Machine MapReduce API

This topic describes using the Splice Machine MapReduce API, com.splicemachine.mrio.api, a simple word count program that retrieves data from an input table, summarizes the count of initial character of each word, and writes the result to an output table.

  1. Define your input and output tables:

    First, assign the name of the Splice Machine database table from which you want to retrieve data to a variable, and then assign a name for your output table to another variable:

    String inputTableName  = "WIKIDATA";
    String outputTableName = "USERTEST";
    

    You can specify table names using the <schemaName>.<tableName> format; if you don’t specify a schema name, the default schema is assumed.

  2. Create a new job instance:

    You need to create a new job instance and assign a name to it:

    Configuration config = HBaseConfiguration.create();
    Job job = new Job(config, "WordCount");
    
  3. Initialize your mapper job:

    We initialize our sample job using the initTableMapperJob utility method:

    TableMapReduceUtil.initTableMapperJob(
        tableName,			// input Splice Machine database table
        scan,			// a scan instance to control CF and attribute selection
        MyMapper.class,		// the mapper
        Text.class,			// the mapper output key
        InitWritable.class,		// the mapper output value
        job,
        true,
        SpliceInputFormat.class);
    
  4. Retrieve values within your map function:

    Our sample map function retrieves and parses a single row with specified columns.

    public void map(ImmutableBytesWritable row, ExecRow value, Context context)
                    throws InterruptedException, IOException {
        if(value != null) {
            try {
                DataValueDescriptor dataValDesc[]  = value.getRowArray();
                if(dataValDesc[0] != null) {}
                word = dataValDesc[0].getString();
                }
            catch (StandardException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            if(word != null) {
                Text key = new Text(word.charAt(0)+"");
                IntWritable val = new IntWritable(1);
                context.write(key, val);
            }
        }
    }
    
  5. Manipulate and save the value with reduce function:

    Our sample reduce function manipulates and saves the value by creating an ExecRow and filling in the row with the execRow.setRowArray method.

    public void reduce(Text key, IterableIntWritable> values, Context context)
                        throws IOException, InterruptedException {
    
        IteratorIntWritable> it=values.iterator();
        ExecRow execRow = new ValueRow(2);
        int sum = 0;
        String word = key.toString();
        while (it.hasNext()) {
            sum += it.next().get();
        }
        try{
            DataValueDescriptor []dataValDescs= {new SQLVarchar(word), new SQLInteger(sum)};
            execRow.setRowArray(dataValDescs);
            context.write(new ImmutableBytesWritable(Bytes.toBytes(word)), execRow);
        }
        catch(Exception E) {
            E.printStackTrace();
        }
    }
    
  6. Commit or rollback the job:

    If the job is successful, commit the transaction.

    job.commit();
    

    If the job fails, roll back the transaction.

    job.rollback();