Module riak_pipe_w_reduce

A "reduce"-like fitting (in the MapReduce sense).

Behaviours: riak_pipe_vnode_worker.

Description

A "reduce"-like fitting (in the MapReduce sense). Really more like a keyed list-fold. This fitting expects inputs of the form {Key, Value}. For each input, the fitting evaluates a function (its argument) on the Value and any previous result for that Key, or [] (the empty list) if that Key has never been seen by this worker. When done is finally received, the fitting sends each key-value pair it has evaluated as an input to the next fittin.

The intent is that a fitting might receive a stream of inputs like {a, 1}, {b, 2}, {a 3}, {b, 4} and send on results like {a, 4}, {b, 6} by using a simple "sum" function.

This function expects a function as its argument. The function should be arity-4 and expect the arguments:
Key :: term()
Whatever aggregation key is necessary for the algorithm.
InAccumulator :: [term()]
A list composed of the new input, cons'd on the front of the result of the last evaluation for this Key (or the empty list if this is the first evaluation for the key).
Partition :: riak_pipe_vnode:partition()
The partition of the vnode on which this worker is running. (Useful for logging, or sending other output.)
FittingDetails :: #fitting_details{}
The details of this fitting. (Useful for logging, or sending other output.)
The function should return a tuple of the form {ok, NewAccumulator}, where NewAccumulator is a list, onto which the next input will be cons'd. For example, the function to sum values for a key, as described above might look like:
  fun(_Key, Inputs, _Partition, _FittingDetails) ->
     {ok, [lists:sum(Inputs)]}
  end

The preferred consistent-hash function for this fitting is chashfun/1. It hashes the input Key. Any other partition function should work, but beware that a function that sends values for the same Key to different partitions will result in fittings down the pipe receiving multiple results for the Key.

This fitting produces as its archive, the store of evaluation results for the keys it has seen. To merge handoff values, the lists stored with each key are concatenated, and the reduce function is re-evaluated.

Data Types

state()

abstract datatype: state()

Function Index

archive/1The archive is just the store (dict()) of evaluation results.
chashfun/1The preferred hashing function.
done/1Unless the aggregation function sends its own outputs, done/1 is where all outputs are sent.
handoff/2The handoff merge is simple a dict:merge, where entries for the same key are concatenated.
init/2Setup creates the store for evaluation results (a dict()) and stashes away the Partition and FittingDetails for later.
process/3Process looks up the previous result for the Key, and then evaluates the funtion on that with the new Input.
validate_arg/1Check that the arg is a valid arity-4 function.

Function Details

archive/1

archive(State::state()) -> {ok, dict()}

The archive is just the store (dict()) of evaluation results.

chashfun/1

chashfun(X1::{term(), term()}) -> riak_pipe_vnode:chash()

The preferred hashing function. Chooses a partition based on the hash of the Key.

done/1

done(State::state()) -> ok

Unless the aggregation function sends its own outputs, done/1 is where all outputs are sent.

handoff/2

handoff(HandoffAccs::dict(), State::state()) -> {ok, state()}

The handoff merge is simple a dict:merge, where entries for the same key are concatenated. The reduce function is also re-evaluated for the key, such that done/1 still has the correct value to send, even if no more inputs arrive.

init/2

init(Partition::riak_pipe_vnode:partition(), FittingDetails::riak_pipe_fitting:details()) -> {ok, state()}

Setup creates the store for evaluation results (a dict()) and stashes away the Partition and FittingDetails for later.

process/3

process(X1::{term(), term()}, Last::boolean(), State::state()) -> {ok, state()}

Process looks up the previous result for the Key, and then evaluates the funtion on that with the new Input.

validate_arg/1

validate_arg(Fun::term()) -> ok | {error, iolist()}

Check that the arg is a valid arity-4 function. See riak_pipe_v:validate_function/3.


Generated by EDoc, Apr 3 2013, 21:57:22.