gusucode.com > bigdata 工具箱 matlab源码程序 > bigdata/+matlab/+bigdata/+internal/+lazyeval/extractHead.m
function out = extractHead(partitionedArray, n) %EXTRACTHEAD Extract the head (first rows) of the provided tall array. % % H = extractHead(partitionedArray, n) extracts the head (first rows) % of the provided partition array of size up-to n in the tall dimension. % % Copyright 2015-2016 The MathWorks, Inc. BIG_N = 1e5; if n > BIG_N % For Large N, minimize communication by using a mapping of partitions % to number of slices to include in the head so that we only get the rows we need [numSlices, partitionId] = partitionheadfun(@(info, v) iGetChunkSize(info, v, n), partitionedArray); [numSlices, partitionId] = clientfun(@(ns, p) iComputePartitionSlices(ns, p, n), numSlices, partitionId); partitionId = matlab.bigdata.internal.broadcast(partitionId); numSlices = matlab.bigdata.internal.broadcast(numSlices); [partitionedArray, partitionedSliceIds] = partitionheadfun(@iSelectN, partitionedArray, partitionId, numSlices); else [partitionedArray, partitionedSliceIds] = partitionheadfun(@(info, v) iFirstNWithEarlyExit(n, v, info), partitionedArray); end [out, ~] = reducefun(@(v, s) iFirstN(n, v, s), partitionedArray, partitionedSliceIds); end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% function [out, sliceId] = iFirstN(n, v, sliceId) % We sort on absolute index of the original array as during a reduction, % v is not guaranteed to be in order. For example, partition 2 might be % processed before partition 1. [sliceId, idx] = sortrows(sliceId); sliceId = sliceId(1:min(n, end), :); idx = idx(1:min(n, end)); szV = size(v); out = v(idx, :); if numel(szV) > 2 szV(1) = numel(idx); out = reshape(out, szV); end if isnumeric(v) && ~isreal(v) && isreal(out) out = complex(out); end end function [hasFinished, out, sliceId] = iFirstNWithEarlyExit(n, v, info) [hasFinished, numSlicesToEmit] = iGetChunkSize(info, v, n); % This pair of indices is equivalent to the absolute index of the slice % with respect to the ordering given by sortrows. sliceId = [info.PartitionId * ones(numSlicesToEmit, 1), info.RelativeIndexInPartition - 1 + (1:numSlicesToEmit)']; [out, sliceId] = iFirstN(numSlicesToEmit, v, sliceId); end function [hasFinished, numSlicesToEmit, partitionId] = iGetChunkSize(info, v, N) numSlices = size(v, 1); numRemainingSlices = max(N - info.RelativeIndexInPartition + 1, 0); numSlicesToEmit = min(numRemainingSlices, numSlices); if numSlicesToEmit == 0 hasFinished = true; numSlicesToEmit = []; partitionId = []; else partitionId = info.PartitionId; hasFinished = info.IsLastChunk || (numRemainingSlices == numSlicesToEmit); end end function [numSlicesFromPartition, partitionsToSelectFrom] = iComputePartitionSlices(numSlices, partitionIds, N) pIds = unique(partitionIds); numSlicesFromPartition = zeros(size(pIds)); for ii=1:numel(pIds) numSlicesFromPartition(ii) = sum(numSlices(partitionIds == pIds(ii))); end numSlicesFromPartition(cumsum(numSlicesFromPartition) > N) = []; partitionsToSelectFrom = pIds(cumsum(numSlicesFromPartition) <= N); end function [hasFinished, out, sliceId] = iSelectN(info, v, p, n) N = n(p==info.PartitionId); if isempty(N) hasFinished = true; out = matlab.bigdata.internal.util.indexSlices(v, []); sliceId = zeros(0, 2); else [hasFinished, out, sliceId] = iFirstNWithEarlyExit(N, v, info); end end