gusucode.com > bigdata 工具箱 matlab源码程序 > bigdata/+matlab/+bigdata/+internal/+io/CacheStore.m

    %CacheStore
% Helper base class that manages a collection of cache entries for one MATLAB
% Context. There will exist two implementations, DiskCacheStore and
% MemoryCacheStore. This will be used by non-spark back-ends.
%
% The public methods supports usage as an array of CacheStore objects. In
% such cases, the array is expected to be in decreasing order of
% performance of the underlying storage.
%

%   Copyright 2016 The MathWorks, Inc.

classdef (Abstract) CacheStore < handle & matlab.mixin.Heterogeneous
    properties (SetAccess = immutable)
        % The amount of local disk space to allow for this cache store in
        % in bytes.
        MaxSize;
    end
    
    properties (Dependent, SetAccess = private)
        % A table of cache entries that are currently stored on disk. This will be
        % ordered by decreasing recently last used. Backed by CacheEntriesImpl
        % which exists solely to avoid holding onto a "table" when empty.
        CacheEntries;
    end
    
    methods
        function out = get.CacheEntries(obj)
            if isempty(obj.CacheEntriesImpl)
                out = table(string.empty(0, 1), zeros(0, 1), zeros(0, 1), zeros(0, 1), cell(0, 1), ...
                            'VariableNames', {'Id', 'PartitionIndex', 'StageIndex', 'Size', 'Data'});
            else
                out = obj.CacheEntriesImpl;
            end
        end
        function set.CacheEntries(obj, newValue)
            if isempty(newValue)
                % Ensure we don't hold a table if we don't need to so that 'table' can be
                % cleared.
                obj.CacheEntriesImpl = [];
            else
                obj.CacheEntriesImpl = newValue;
            end
        end
    end
    
    properties (SetAccess = private)
        % Backing data structure for CacheEntries.
        CacheEntriesImpl = [];
        
        % A list of all CacheEntryKey Ids for which a Writer object
        % currently exists.
        %
        % Note, this is independent of partition index as a single worker
        % is not expected to write to multiple partitions of the cache
        % simultaneously.
        OpenForWriteIds = string.empty(0, 1);
        
        % An index into the current stage of execution.
        %
        % This will monotonically increase. This exists so that cache entries
        % generated by the same execution stage do not override those
        % written by the same stage. This is to prevent cache cycling, that
        % partition N should not push partition 1 of the same array out of
        % the cache.
        StageIndex = 0;
        
        % The current size of all cache entries associated with the current stage.
        StageSize = 0;
    end
    
    methods (Access = protected)
        % The main constructor.
        function obj = CacheStore(maxSizeInBytes)
            obj.MaxSize = maxSizeInBytes;
        end
    end
    
    methods (Sealed)
        % Open or create a cache entry for the given cache entry key and
        % partition index.
        %
        % Both the reader and writer outputs can be empty or non-empty.
        %
        % Outputs:
        %  - reader: Either an implementation of the Reader interface or
        %  empty. This is non-empty if the corresponding cache entry exists
        %  in any of the CacheStore objects.
        %
        %  - writer: Either an implementation of the writer interface or
        %  empty. This is non-empty if the corresponding cache entry does
        %  not exist in any of the CacheStore objects and will write to all
        %  CacheStore objects. This is also non-empty if the cache entry
        %  exists but only in one of the slower CacheStore objects. In such
        %  cases, the writer write to all of faster CacheStore objects as a
        %  way of upgrading the cache entry.
        %
        function [reader, writer] = openOrCreateEntry(objs, cacheEntryKey, partitionIndex)
            writer = [];
            for obj = objs(:)'
                reader = obj.openEntryForRead(cacheEntryKey.Id, partitionIndex);
                if isempty(reader)
                    writer = [writer; obj.openEntryForWrite(cacheEntryKey.Id, partitionIndex)]; %#ok<AGROW>
                else
                    break;
                end
            end
        end
        
        % Increment the stage index. This allows subsequent cache entry
        % creation to override existing cache entries.
        function nextStage(objs)
            for obj = objs(:)'
                obj.StageIndex = obj.StageIndex + 1;
                obj.StageSize = 0;
            end
        end
        
        % Copy all entries from one CacheStore to another.
        function copyEntries(source, destination)
            assert(isscalar(source));
            assert(isscalar(destination));
            
            for ii = 1:height(source.CacheEntries)
                id = source.CacheEntries.Id(ii);
                partitionIndex = source.CacheEntries.PartitionIndex(ii);
                
                if any(destination.CacheEntries.Id == id & destination.CacheEntries.PartitionIndex == partitionIndex)
                    continue;
                end
                
                try
                    reader = source.openEntryForRead(id, partitionIndex);
                    writer = destination.openEntryForWrite(id, partitionIndex);
                    while hasdata(reader) && writer.HasEntry
                        writer.add([], read(reader));
                    end
                    writer.commit();
                catch
                    % Nothing we can do. If this occurs, the delete method
                    % of writer will delete any created files. We still
                    % have to delete the memory cache entry, so we are left
                    % with no cache entry.
                end
            end
        end
        
        % Remove all cache entries corresponding with the provided cache
        % entry key.
        function removeEntry(objs, cacheEntryKey)
            for obj = objs(:)'
                obj.doRemoveAll(cacheEntryKey.Id);
                obj.CacheEntries(obj.CacheEntries.Id == cacheEntryKey.Id, :) = [];
            end
        end
        
        % Remove all cache entries.
        function removeAllEntries(objs)
            for obj = objs(:)'
                cacheEntryIds = unique(obj.CacheEntries.Id);
                for cacheEntryId = cacheEntryIds(:)'
                    obj.doRemoveAll(cacheEntryId);
                end
                obj.CacheEntries = obj.CacheEntries([], :);
            end
        end
    end
    
    methods (Abstract, Access = protected)
        % The underlying implementation specific pieces of openEntryForRead.
        % This is called with the data from a previous committed cache entry.
        reader = doOpenForRead(obj, data);
        
        % The underlying implementation specific pieces of openEntryForWrite.
        writer = doOpenForWrite(obj, cacheEntryId, PartitionIndex);
        
        % Do cleanup for the data of a specific cache entry. This is
        % a hook for the disk cache cleanup.
        doRemove(obj, data);
        
        % Do cleanup of all data associated with a cache entry ID. This is
        % a hook for the disk cache cleanup.
        doRemoveAll(obj, cacheEntryId);
    end
    
    methods (Access = ?matlab.bigdata.internal.io.CacheWriter)
        % Commit a disk cache entry to the store.
        %
        % This will be called by CacheEntryWriter and expects to receive a
        % collection of files under BasePath that can be read using an
        % TallDatastore.
        function commitEntry(obj, cacheEntryId, partitionIndex, data, dataSize)
            obj.CacheEntries(end + 1, :) = {string(cacheEntryId), partitionIndex, obj.StageIndex, dataSize, {data}};
            obj.CacheEntries = obj.CacheEntries([2:end, 1], :);
            obj.StageSize = obj.StageSize + dataSize;
            
            cumulativeSize = cumsum(obj.CacheEntries.Size);
            for ii = find(cumulativeSize > obj.MaxSize)'
                obj.doRemove(obj.CacheEntries.Data{ii});
            end
            obj.CacheEntries(cumulativeSize > obj.MaxSize, :) = [];
        end
        
        % Commit a memory cache entry to the store.
        %
        % This will be called by CacheEntryWriter and expects to receive a
        % N x 1 cell array of chunks that represent the memory cache entry.
        % Close an entry.
        %
        % This will be called by CacheEntryWriter when finished regardless
        % of whether a cache entry was committed or not.
        function closeEntry(obj, cacheEntryId, partitionIndex) %#ok<INUSD>
            obj.OpenForWriteIds(obj.OpenForWriteIds == cacheEntryId, :) = [];
        end
        
        % Check whether the provided size in bytes will fit in the disk
        % cache store.
        function tf = checkCacheSize(obj, sizeInBytes)
            tf = sizeInBytes < obj.MaxSize - obj.StageSize;
        end
    end
    
    methods (Access = private)
        % Open a cache entry for reading.
        %
        % This can return empty if this is not possible.
        function reader = openEntryForRead(obj, cacheEntryId, partitionIndex)
            entryIndex = find(obj.CacheEntries.Id == cacheEntryId ...
                & obj.CacheEntries.PartitionIndex == partitionIndex, 1);
            
            if isempty(entryIndex)
                reader = [];
                return;
            end
            
            reader = obj.doOpenForRead(obj.CacheEntries.Data{entryIndex});
            
            % If the entry hasn't yet been used in the current stage of
            % execution, move it into the current stage.
            if obj.CacheEntries.StageIndex(entryIndex) ~= obj.StageIndex
                obj.CacheEntries.StageIndex(entryIndex) = obj.StageIndex;
                obj.StageSize = obj.StageSize + obj.CacheEntries.Size(entryIndex);
            end
            
            % Move the updated entry in the table to the top of the list as
            % it is now the most recently used.
            obj.CacheEntries = obj.CacheEntries([entryIndex, 1 : entryIndex - 1, entryIndex + 1 : end], :);
        end
        
        % Open a cache entry for writing.
        %
        % This can return empty if this is not possible.
        function writer = openEntryForWrite(obj, cacheEntryId, partitionIndex)
            if any(cacheEntryId == obj.OpenForWriteIds)
                writer = [];
            else
                writer = obj.doOpenForWrite(cacheEntryId, partitionIndex);
            end
        end
    end
end