gusucode.com > bigdata 工具箱 matlab源码程序 > bigdata/+matlab/+bigdata/+internal/+optimizer/FusingOptimizer.m

    %FusingOptimizer Optimizer that attempts to fuse closures. Primarily
%aggregations.

% Copyright 2016 The MathWorks, Inc.

classdef FusingOptimizer < matlab.bigdata.internal.Optimizer
    
    properties
        % Set MaxFuseAttempts to 1 since we currently expect all aggregation fusing to
        % occur in the first pass. (This avoids recalculating the graph)
        MaxFuseAttempts = 1;
        Debug = false;
    end
    
    methods (Access = private)
        function tf = combineAggregations(~, closureGraph, dist, redDepths)

            tf = false;

            isAggregation  = closureGraph.Nodes.OpType == 'AggregateOperation';
            uniqueReductionDepths = unique(redDepths(isAggregation));

            for depthId = 1:numel(uniqueReductionDepths)
                rows = (redDepths == uniqueReductionDepths(depthId)) & isAggregation;
                if sum(rows) > 1
                    % Assert that there are no data dependencies. fusingDistances is a distance
                    % matrix for the nodes we're trying to fuse. It should be 0
                    % on the diagonal and Inf off the diagonal.
                    fusingDistances   = dist(rows, rows);
                    expectedDistances = (1 ./ eye(size(fusingDistances))) - 1;
                    assert(isequal(fusingDistances, expectedDistances), ...
                           'Attempt to fuse operations with a data dependency.');
                    
                    % Got multiple aggregations to fuse
                    aggregationsToFuseC = closureGraph.Nodes.NodeObj(rows);
                    iFuseAggregations([aggregationsToFuseC{:}]);
                    tf = true;
                end
            end
        end
        
        function tf = combineAggregationsByKey(~, closureGraph, dist, redDepths)
        % The constraints for combineAggregationsByKey are quite severe. We need to find
        % two or more AggregateByKey operations (reducebykeyfun operations end
        % up here too) where the key inputs are identical, and the reduction
        % depths are the same, and the tall sizes for all inputs should be
        % identical too.
            tf = false;
            
            isAggregationByKey    = closureGraph.Nodes.OpType == 'AggregateByKeyOperation';
            uniqueReductionDepths = unique(redDepths(isAggregationByKey));
            
            for depthId = 1:numel(uniqueReductionDepths)
                rowIsThisDepth = (redDepths == uniqueReductionDepths(depthId)) & isAggregationByKey;
                numAggByKey    = sum(rowIsThisDepth);
                if numAggByKey > 1
                    % We need to group by unique key inputs. We need to walk back up the execution
                    % graph from the first input to the closure, stepping over
                    % "iAssertAdaptorInfoCorrect" nodes, looking for a common
                    % ClosurePromise.
                    aggByKeyObjsCell   = closureGraph.Nodes.NodeObj(rowIsThisDepth);
                    keyInputPromiseIds = cellfun(@(closure) iFindUnderlyingPromiseId(closure.InputFutures(1)), ...
                                                 aggByKeyObjsCell, 'UniformOutput', false);
                    aggregationGroup   = findgroups(keyInputPromiseIds);
                    for groupIdx = 1:max(aggregationGroup)
                        match      = aggregationGroup == groupIdx;
                        numMatches = sum(match);
                        if numMatches > 1
                            % Assert that there are no data dependencies. fusingDistances is a distance
                            % matrix for the nodes we're trying to fuse. It should be 0
                            % on the diagonal and Inf off the diagonal.
                            fusingIdx         = find(rowIsThisDepth);
                            fusingIdx         = fusingIdx(match);
                            fusingDistances   = dist(fusingIdx, fusingIdx);
                            expectedDistances = (1 ./ eye(size(fusingDistances))) - 1;
                            assert(isequal(fusingDistances, expectedDistances), ...
                                   'Attempt to fuse operations with a data dependency.');
                            
                            % Actually fuse the aggregations.
                            iFuseAggregationsByKey([aggByKeyObjsCell{match}]);
                            tf = true;
                        end
                    end
                end
            end
            
        end
    end
    methods
        function tf = optimize(obj, varargin)
            done = false;
            tf   = false;
            count = 0;
            closureGraph = matlab.bigdata.internal.optimizer.ClosureGraph(varargin{:});
            while ~done
                graphObj = closureGraph.Graph;
                
                % We need to generate a topological sort of the graph so that we can calculate
                % the tall sizes from the graph connectivity
                order = toposort(graphObj);
                
                % tallSize is +ve for real known sizes, -ve for 'symbolic' sizes. Symbolic sizes
                % are sizes that we know are identical to one another, but we
                % don't (yet) know the actual size.
                tallSize = nan(numel(order), 1);

                % reductionDepth is zero for nodes with no predecessors, and increases as
                % reductions are encountered.
                reductionDepth = zeros(numel(order), 1);

                % reductionCombinations is a map from a list of input depths to a resulting
                % output depth. It's used to ensure that we don't create a new
                % reduction depth for each time we encounter a given combination
                % of input reduction depths.
                reductionCombinations = containers.Map();

                % inSize is the input size Id for a given node, or NaN if multiple sizes are
                % input to a single node. We use this to be conservative about
                % fusing aggregations - we'll only fuse those aggregations where
                % we can prove they have well-known and unique input sizes.
                inSize   = nan(numel(order), 1);

                % Extract the node list up front so that we minimise the number of times we use
                % the digraph/subsref implementation.
                nodeList = graphObj.Nodes.NodeObj;
                opTypes  = graphObj.Nodes.OpType;

                % Topological sort guarantees that each node we iterate over has all predecessor
                % information available
                for idx = 1:numel(order)
                    nodeIdx = order(idx);
                    nodeObj = nodeList{nodeIdx};
                    opType  = opTypes(nodeIdx);
                    [tallSize, reductionDepth, inSize(nodeIdx)] = ...
                        iCalcTallSizeReductionDepth(graphObj, nodeObj, nodeIdx, opType, tallSize, reductionDepth, ...
                                                    reductionCombinations);
                end

                % Calculate all (directed) distances between nodes so that we can check that we
                % don't accidentally try to fuse aggregations which have data
                % dependencies between them.
                dist     = distances(graphObj);
                combined = combineAggregations(obj, graphObj, dist, reductionDepth);
                combined = combineAggregationsByKey(obj, graphObj, dist, reductionDepth) || combined;
                count = 1 + count;
                done = ~combined || count >= obj.MaxFuseAttempts;
                tf = tf || combined;
                if ~done && combined
                    % We made changes, recompute the closure graph.
                    recalculate(closureGraph);
                end
            end
        end
    end
end

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% Derive a real or symbolic tall output size for a given closure. Here we rely
% on the fact that all outputs for any type of closure must have the same tall
% size.
%
% Since the computation is very closely related, we also compute the updated
% "reduction depth" while we're here.
%
% For elementwise and slicewise operation types, if all the input sizes are
% identical (or known to be 1), then the output size is the same as all input
% sizes. In all other cases, a new symbolic output size is allocated.
function [tsz, redDepth] = iDeriveTallSizeReductionDepthForClosure(opType, ...
                                   predecessorIdxs, tszs, redDepths, redCombin)

    % Default: allocate a new symbolic tall size
    tsz = min(tszs - 1);

    % Default: allocate a new reduction depth
    redDepth = max(redDepths) + 1;

    % For elementwise and slicewise, we know that we *might* be able to propagate
    % the size
    isSizePreserving = opType == 'ElementwiseOperation' || ...
        opType == 'SlicewiseOperation';
    if isSizePreserving
        inputSizes = tszs(predecessorIdxs);
        % Ignore inputs which are size 1 in the tall dimension
        inputSizes(inputSizes == 1) = [];
        
        if isempty(inputSizes) && ~isempty(predecessorIdxs)
            % There were some precessors, but they were all size 1 in the tall dimension.
            tsz = 1;
        elseif numel(unique(inputSizes)) == 1
            % All sizes the same, propagate. These sizes might be real +ve sizes, or
            % symbolic -ve sizes.
            tsz = inputSizes(1);
        end
    end

    isDepthPreserving = opType == 'ElementwiseOperation' || ...
        opType == 'SlicewiseOperation' || ...
        opType == 'FilterOperation' || ...
        opType == 'ChunkwiseOperation';
    uniqueInputDepths = unique(redDepths(predecessorIdxs));

    if isDepthPreserving
        redDepth = max(uniqueInputDepths);
    else
        depthKey = num2str(uniqueInputDepths);
        if redCombin.isKey(depthKey)
            redDepth = redCombin(depthKey);
        else
            % Use the new value
            redCombin(depthKey) = redDepth; %#ok<NASGU> handle
        end
    end
end

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% This function is called in a loop on a topologically sorted list of nodes. It
% updates elements of 'tszs' - the tall output size array, and also calculates
% 'insz', the tall size of the inputs to the closure
function [tszs, redDepths, insz] = iCalcTallSizeReductionDepth(graphObj, node, nodeIdx, opType, ...
                                                      tszs, redDepths, redCombin)
    pred = predecessors(graphObj, nodeIdx);
    if isClosure(node)
        if isempty(pred)
            % No predecessors - allocate a new symbolic size for the output of this closure.
            tszs(nodeIdx) = min(-1, min(tszs) - 1);
            % reductionDepth is initialized to zero - nothing to do here.
        else
            % Need to derive from inputs
            [tszs(nodeIdx), redDepths(nodeIdx)] = ...
                iDeriveTallSizeReductionDepthForClosure(opType, pred, tszs, redDepths, redCombin);
        end
    elseif isPromise(node) && node.IsDone
        % Completed promise - we have the actual data, so use that to get a real size.
        tszs(nodeIdx) = size(node.CachedValue, 1);
        % reductionDepth can stay at zero.
    elseif isPromise(node) || isFuture(node)
        % Uncompleted promise or any future - output size is simply input size.
        assert(isscalar(pred));
        assert(~isnan(tszs(pred)));
        tszs(nodeIdx) = tszs(pred);
        % Likewise, reduction depth unchanged
        redDepths(nodeIdx) = redDepths(pred);
    else
        assert(false);
    end
    
    % Compute the input size for this node. If all predecessor output sizes are
    % identical, that's our 'input size'. If they aren't, then we don't know
    % what the input size is, so return NaN.
    inszs = tszs(pred);
    if ~isempty(inszs) && numel(unique(inszs)) == 1
        insz = inszs(1);
    else
        insz = NaN;
    end
end

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% Given a list of closures, inject upstream closures to wrap inputs in a scalar
% cell array. Return an array of futures that corresponds to all the outputs of
% the given closures.
function futures = iAddEncellClosures(numInPerClosure, closuresToFuse)
    import matlab.bigdata.internal.lazyeval.ElementwiseOperation;
    import matlab.bigdata.internal.lazyeval.EncellificationOperation;
    import matlab.bigdata.internal.lazyeval.Closure;
    import matlab.bigdata.internal.FunctionHandle;

    % Allocate a cell array that will contain the futures prior to encellification.
    preEncellClosureInputFutures = cell(1, sum(numInPerClosure));
    
    % Iterate over the to-be-fused closures, 
    nextInputIdx = 1;
    for cidx = 1:numel(closuresToFuse)
        closure = closuresToFuse(cidx);
        for iidx = 1:numInPerClosure(cidx)
            % If MaxNumSlices is finite, we need a dummy operation to
            % attach this constraint. This is so that the en-cellification
            % operation itself does not have to deal with this.
            fcn = FunctionHandle(@deal, 'MaxNumSlices', ...
                closure.Operation.PerChunkFunctionHandle.MaxNumSlices);
            op = ElementwiseOperation(fcn, 1, 1);
            newClosure = Closure(closure.InputFutures(iidx), op);
            preEncellClosureInputFutures{nextInputIdx} = newClosure.OutputPromises.Future;
            nextInputIdx = 1 + nextInputIdx;
        end
    end
    preEncellClosureInputFutures = vertcat(preEncellClosureInputFutures{:});
    
    op = EncellificationOperation(numel(preEncellClosureInputFutures));
    newClosure = Closure(preEncellClosureInputFutures, op);

    futures = vertcat(newClosure.OutputPromises.Future);
    fcn = FunctionHandle(@deal);
    op = ElementwiseOperation(fcn, 1, 1);
    for iidx = 1:numel(futures)
        % TODO(g1394174): This is a workaround to an issue where
        % the optimizer errors if identical edges exist between one
        % source and destination node.
        newClosure = Closure(futures(iidx), op);
        futures(iidx) = newClosure.OutputPromises.Future;
    end
end

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% Given an array of promises referring to cell data, add closures to remove
% the layer of cell-array
function promises = iAddDecellClosures(cellifiedPromises)
    import matlab.bigdata.internal.lazyeval.ChunkwiseOperation;
    import matlab.bigdata.internal.lazyeval.Closure;
    import matlab.bigdata.internal.FunctionHandle;

    promises = cell(1, numel(cellifiedPromises));
    for outIdx = 1:numel(cellifiedPromises)
        cls = Closure(cellifiedPromises(outIdx).Future, ...
                      ChunkwiseOperation(FunctionHandle(@(x) x{1}), 1, 1));
        promises{outIdx} = cls.OutputPromises;
    end
    promises = vertcat(promises{:});
end

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% Fusing aggregation starts as follows: we need to en-cellify all inputs, then
% build a fused aggregation closure, then de-cellify all outputs.
function iFuseAggregations(closuresToFuse)
    
    import matlab.bigdata.internal.lazyeval.AggregateOperation;
    import matlab.bigdata.internal.lazyeval.ChunkwiseOperation;
    import matlab.bigdata.internal.lazyeval.Closure;
    import matlab.bigdata.internal.FunctionHandle;

    numInPerClosure = arrayfun(@(c) numel(c.InputFutures), closuresToFuse);
    numOutPerClosure = arrayfun(@(c) numel(c.OutputPromises), closuresToFuse);

    % Add calls to encell-ify the inputs
    fusedClosureInputFutures = iAddEncellClosures(numInPerClosure, closuresToFuse);
    
    originalOperations = arrayfun(@(c) c.Operation, closuresToFuse, 'UniformOutput', false);
    
    % The aggregation phase can emit a number of outputs that is different to the
    % number of inputs (and different to the overall number of outputs)
    numInPerFcnAggregatePhase  = numInPerClosure;
    numOutPerFcnAggregatePhase = cellfun(@(op) op.NumOutputs, originalOperations);
    originalAggregateFcnCell   = arrayfun(@(c) c.Operation.PerChunkFunctionHandle, ...
                                          closuresToFuse, 'UniformOutput', false);
    % For AggregateOperation, we're adding the cellification layer.
    doCellification = true;
    newAggregateFcn = FunctionHandle(@(varargin) iFusedFcn(doCellification, ...
                                                      originalAggregateFcnCell, ...
                                                      numInPerFcnAggregatePhase, ...
                                                      numOutPerFcnAggregatePhase, ...
                                                      varargin{:}), 'MaxNumSlices', 1);
    
    
    % Each reduction function takes has a number of inputs equal to the number of
    % outputs of the aggregation phase. The reduction function emits a number of
    % outputs equal to the overall number of outputs of the original closure.
    numInPerFcnReducePhase     = numOutPerFcnAggregatePhase;
    numOutPerFcnReducePhase    = numOutPerClosure;
    originalReduceFcnCell      = arrayfun(@(c) c.Operation.ReduceFunctionHandle, ...
                                          closuresToFuse, 'UniformOutput', false);
    newReduceFcn = FunctionHandle(@(varargin) iFusedFcn(doCellification, ...
                                                      originalReduceFcnCell, ...
                                                      numInPerFcnReducePhase, ...
                                                      numOutPerFcnReducePhase, ...
                                                      varargin{:}), 'MaxNumSlices', 2);

    % Build the new operation and corresponding closure.
    newOperation = AggregateOperation(newAggregateFcn, newReduceFcn, ...
                                      sum(numInPerClosure), sum(numOutPerClosure));
    newFusedClosure = Closure(fusedClosureInputFutures, newOperation);
    
    % De-cellify the outputs
    replacementPromises = iAddDecellClosures(newFusedClosure.OutputPromises);
    
    % Finally, swap over the new promises
    nextNewPromiseIdx = 1;
    for cidx = 1:numel(closuresToFuse)
        closure = closuresToFuse(cidx);
        for outIdx = 1:numel(closure.OutputPromises)
            swap(replacementPromises(nextNewPromiseIdx), closure.OutputPromises(outIdx));
            nextNewPromiseIdx = 1 + nextNewPromiseIdx;
        end
    end
end

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% This function serves as the fused body of both the aggregation and reduction
% phases of an aggregation operation. It expects that all inputs have been
% wrapped in a scalar cell array, and it will wrap all outputs in a scalar cell
% array. This is to allow operations to be fused where the tall size does not
% necessarily match.
%
% underlyingFunctions is a cell array of FunctionHandle objects representing the
% original functions.
%
% numInPerFcn is a numeric vector specifying how many elements of 'varargin'
% each element of 'underlyingFunctions' should consume
%
% numOutPerClosure is a numeric vector specifying how many elements of
% 'varargout' each element of 'underlyingFunctions' should provide
%
% varargin are the chunks of data, wrapped in scalar cells.
function varargout = iFusedFcn(doCellification, underlyingFunctions, ...
                               numInPerFcn, numOutPerFcn, varargin)
    
    assert(numel(underlyingFunctions) == numel(numInPerFcn) && ...
           numel(underlyingFunctions) == numel(numOutPerFcn) && ...
           nargout == sum(numOutPerFcn) && ...
           numel(varargin) == sum(numInPerFcn));
    
    nextInputToConsume = 1;
    nextOutputToProvide = 1;
    varargout = cell(1, nargout);

    for fcnIdx = 1:numel(underlyingFunctions)
        
        % Pick off the inputs from varargin for this function
        numToConsume = numInPerFcn(fcnIdx);
        inputs = varargin(nextInputToConsume:(nextInputToConsume + numToConsume - 1));
        nextInputToConsume = numToConsume + nextInputToConsume;
        
        % De-cellify inputs and vertcat (for the reduction phase)
        if doCellification
            inputs = cellfun(@matlab.bigdata.internal.util.vertcatCellContents, inputs, 'UniformOutput', false);
        end
        
        % Allocate output cell array for this function
        numToProvide = numOutPerFcn(fcnIdx);
        outputs = cell(1, numToProvide);
        try
            [outputs{1:numToProvide}] = feval(underlyingFunctions{fcnIdx}, inputs{:});
        catch E
            % Here, we expect the error emanating from FunctionHandle/feval to have a cause
            % which has the correct information. To avoid this information being
            % "wrapped" twice by FunctionHandle/throwAsFunction, we peel off the
            % envelope.
            if strcmp(E.identifier, 'MATLAB:bigdata:array:FunctionHandleError') && ...
                    isscalar(E.cause)
                throw(E.cause{1});
            else
                % Unexpected...
                rethrow(E);
            end
        end
        
        % Re-cellify outputs and push back into varargout.
        if doCellification
            outputs = num2cell(outputs);
        end
        varargout(nextOutputToProvide:(nextOutputToProvide + numToProvide - 1)) = outputs;
        nextOutputToProvide = numToProvide + nextOutputToProvide;
    end
end

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% Look up the execution graph to find the real promise Id corresponding to this
% future. This function is necessary so that we can step over "adaptor
% information assertion" functions.
function promiseId = iFindUnderlyingPromiseId(fut)
    done = false;
    while ~done
        assert(isscalar(fut), 'Future for iFindUnderlyingPromiseId must be scalar.');
        promise = fut.Promise;
        promiseId = promise.Id;
        if isempty(promise.Closure) || ~iIsAdaptorAssertionOrNoOp(promise.Closure)
            done = true;
        else
            % Walk past the closure to the next future and go around again.
            fut = promise.Closure.InputFutures;
        end
    end
end
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% Does a closure correspond to iAssertAdaptorInfoCorrect or a iNoOpForOptimizer?
function tf = iIsAdaptorAssertionOrNoOp(closure)
    tf = false;
    if isa(closure.Operation, 'matlab.bigdata.internal.lazyeval.ElementwiseOperation')
        fcn = closure.Operation.FunctionHandle;
        if isa(fcn, 'matlab.bigdata.internal.FunctionHandle')
            underlyingFcnStr = func2str(fcn.Handle);
            tf = ~isempty(strfind(underlyingFcnStr, 'iAssertAdaptorInfoCorrect')) ...
                || ~isempty(~strfind(underlyingFcnStr, 'iNoOpForOptimizer'));
        end
    end
end

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% Fuse aggregations by key
function iFuseAggregationsByKey(closuresToFuse)
    import matlab.bigdata.internal.lazyeval.AggregateByKeyOperation;
    import matlab.bigdata.internal.FunctionHandle;
    import matlab.bigdata.internal.lazyeval.Closure;
    import matlab.bigdata.internal.lazyeval.SlicewiseOperation;
    
    % Calculate the number of inputs and outputs per *closure* (remembering that 'by
    % key' operations have the 'key' input/output which is not passed to the
    % aggregation/reduction functions).
    numInPerClosure    = arrayfun(@(c) numel(c.InputFutures), closuresToFuse);
    numOutPerClosure   = arrayfun(@(c) numel(c.OutputPromises), closuresToFuse);
    originalOperations = arrayfun(@(c) c.Operation, closuresToFuse, 'UniformOutput', false);

    % Get the input list for the fused AggregateByKeyOperation
    fusedAggByKeyInputFutures = iCalcInputsForFusedAggByKey(numInPerClosure, closuresToFuse);
    
    % Number of inputs/outputs per *function* is one less than the number of inputs
    % per *closure*
    numInPerFcnAggregatePhase  = numInPerClosure - 1;
    numOutPerFcnAggregatePhase = cellfun(@(op) op.NumOutputs, originalOperations) - 1;
    originalAggregateFcnCell   = arrayfun(@(c) c.Operation.PerChunkFunctionHandle, ...
                                          closuresToFuse, 'UniformOutput', false);
    maxNumSlicesForAggregation = min(arrayfun(@(c) c.Operation.PerChunkFunctionHandle.MaxNumSlices, ...
                                              closuresToFuse));
    % For AggregateByKey fusings, we're much more constrained in the sizes that we
    % accept - and thus we don't need en-cellification.
    doCellification = false;
    newAggregateFcn = FunctionHandle(@(varargin) iFusedFcn(doCellification, ...
                                                      originalAggregateFcnCell, ...
                                                      numInPerFcnAggregatePhase, ...
                                                      numOutPerFcnAggregatePhase, ...
                                                      varargin{:}), ...
                                     'MaxNumSlices', maxNumSlicesForAggregation);
    
    % For AggregateByKey fusings, number of inputs & outputs per *function* in the
    % reduce phase is the same as the number of *function* outputs from the
    % aggregate phase.
    numInPerFcnReducePhase   = numOutPerFcnAggregatePhase;
    numOutPerFcnReducePhase  = numOutPerFcnAggregatePhase;
    originalReduceFcnCell    = arrayfun(@(c) c.Operation.ReduceFunctionHandle, ...
                                       closuresToFuse, 'UniformOutput', false);
    maxNumSlicesForReduction = min(arrayfun(@(c) c.Operation.ReduceFunctionHandle.MaxNumSlices, ...
                                            closuresToFuse));
    newReduceFcn             = FunctionHandle(@(varargin) iFusedFcn(doCellification, ...
                                                      originalReduceFcnCell, ...
                                                      numInPerFcnReducePhase, ...
                                                      numOutPerFcnReducePhase, ...
                                                      varargin{:}), ...
                                              'MaxNumSlices', maxNumSlicesForReduction);
    
    % Put the two new functions together into a single AggregateByKeyOperation and
    % Closure.
    numInFusedOperation  = 1 + sum(numInPerFcnAggregatePhase);
    numOutFusedOperation = 1 + sum(numOutPerFcnReducePhase);
    newOperation         = AggregateByKeyOperation(newAggregateFcn, newReduceFcn, ...
                                           numInFusedOperation, numOutFusedOperation);
    newFusedClosure      = Closure(fusedAggByKeyInputFutures, newOperation);
    
    % Next, we need to deal the outputs of the newFusedClosure to replicate the
    % 'key' output once for each original input closure. This is required
    % because we cannot (using only "swap" on promises) hook up the single key
    % output from the fused AggregateByKeyOperation to the multiple key outputs
    % of the original operations.
    numInDealPhase   = numOutFusedOperation;
    numOutDealPhase  = sum(numOutPerClosure);
    fusedOutputsCell = arrayfun(@(x) x.Future, newFusedClosure.OutputPromises, ...
                                'UniformOutput', false);
    dealFcn          = FunctionHandle(@(varargin) iDealFcn(numOutPerClosure, varargin{:}));
    dealOperation    = SlicewiseOperation(dealFcn, numInDealPhase, numOutDealPhase);
    dealClosure      = Closure([fusedOutputsCell{:}], dealOperation);
    
    % Finally, swap over the new promises
    nextNewPromiseIdx = 1;
    for cidx = 1:numel(closuresToFuse)
        closure = closuresToFuse(cidx);
        for outIdx = 1:numel(closure.OutputPromises)
            swap(dealClosure.OutputPromises(nextNewPromiseIdx), closure.OutputPromises(outIdx));
            nextNewPromiseIdx = 1 + nextNewPromiseIdx;
        end
    end
end

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% Compute the input futures for a fused AggregateByKey closure. We need to pick
% only a single 'key' input from the first closure, and then the remaining
% inputs from each closure. Preceding logic should ensure that the 'key' inputs
% for all fusing closures are equivalent.
function fusedInputFutures = iCalcInputsForFusedAggByKey(numInPerClosure, closuresToFuse)
    numInFused = 1 + sum(numInPerClosure - 1);
    fusedInCell = cell(numInFused, 1);
    % The single copy of the key input
    fusedInCell{1} = closuresToFuse(1).InputFutures(1);
    nextFusedInIdx = 2;
    % Pick up the remaining inputs.
    for cIdx = 1:numel(closuresToFuse)
        closure = closuresToFuse(cIdx);
        for fIdx = 2:numel(closure.InputFutures)
            fusedInCell{nextFusedInIdx} = closure.InputFutures(fIdx);
            nextFusedInIdx = 1 + nextFusedInIdx;
        end
    end
    fusedInputFutures = [fusedInCell{:}];
end

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
function varargout = iDealFcn(numOutPerClosure, key, varargin)
    varargout = cell(1, sum(numOutPerClosure));
    nextOutputIdx = 1;
    nextInputIdx  = 1;
    for closureIdx = 1:numel(numOutPerClosure)
        varargout{nextOutputIdx} = key;
        nextOutputIdx = 1 + nextOutputIdx;
        for outputIdx = 2:numOutPerClosure(closureIdx)
            varargout{nextOutputIdx} = varargin{nextInputIdx};
            nextOutputIdx = 1 + nextOutputIdx;
            nextInputIdx  = 1 + nextInputIdx;
        end
    end
end