Behaviours: riak_pipe_vnode_worker.
A "reduce"-like fitting (in the MapReduce sense). Really more
like a keyed list-fold. This fitting expects inputs of the
form {Key, Value}
. For each input, the fitting evaluates a
function (its argument) on the Value
and any previous result
for that Key
, or []
(the empty list) if that Key
has
never been seen by this worker. When done
is finally
received, the fitting sends each key-value pair it has
evaluated as an input to the next fittin.
The intent is that a fitting might receive a stream of inputs
like {a, 1}, {b, 2}, {a 3}, {b, 4}
and send on results like
{a, 4}, {b, 6}
by using a simple "sum" function.
Key
:: term()
InAccumulator
:: [term()]
Key
(or the empty list
if this is the first evaluation for the key).
Partition
:: riak_pipe_vnode:partition()
FittingDetails
:: #fitting_details{}
{ok,
NewAccumulator}
, where NewAccumulator
is a list, onto which
the next input will be cons'd. For example, the function to
sum values for a key, as described above might look like:
fun(_Key, Inputs, _Partition, _FittingDetails) -> {ok, [lists:sum(Inputs)]} end
The preferred consistent-hash function for this fitting is
chashfun/1
. It hashes the input Key
. Any other
partition function should work, but beware that a function
that sends values for the same Key
to different partitions
will result in fittings down the pipe receiving multiple
results for the Key
.
abstract datatype: state()
archive/1 | The archive is just the store (dict()) of evaluation results. |
chashfun/1 | The preferred hashing function. |
done/1 | Unless the aggregation function sends its own outputs, done/1 is where all outputs are sent. |
handoff/2 | The handoff merge is simple a dict:merge, where entries for the same key are concatenated. |
init/2 | Setup creates the store for evaluation results (a dict()) and
stashes away the Partition and FittingDetails for later. |
process/3 | Process looks up the previous result for the Key , and then
evaluates the funtion on that with the new Input . |
validate_arg/1 | Check that the arg is a valid arity-4 function. |
archive(State::state()) -> {ok, dict()}
The archive is just the store (dict()) of evaluation results.
chashfun(X1::{term(), term()}) -> riak_pipe_vnode:chash()
The preferred hashing function. Chooses a partition based
on the hash of the Key
.
done(State::state()) -> ok
Unless the aggregation function sends its own outputs, done/1 is where all outputs are sent.
The handoff merge is simple a dict:merge, where entries for
the same key are concatenated. The reduce function is also
re-evaluated for the key, such that done/1
still has
the correct value to send, even if no more inputs arrive.
init(Partition::riak_pipe_vnode:partition(), FittingDetails::riak_pipe_fitting:details()) -> {ok, state()}
Setup creates the store for evaluation results (a dict()) and
stashes away the Partition
and FittingDetails
for later.
Process looks up the previous result for the Key
, and then
evaluates the funtion on that with the new Input
.
validate_arg(Fun::term()) -> ok | {error, iolist()}
Check that the arg is a valid arity-4 function. See riak_pipe_v:validate_function/3
.
Generated by EDoc, Apr 3 2013, 21:57:22.