flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/CheckpointStorage.java
/** * CheckpointStorage implements the durable storage of checkpoint data and metadata streams. * An individual checkpoint or savepoint is stored to a {@link CheckpointStorageLocation}, * created by this class. */ public interface CheckpointStorage { boolean supportsHighlyAvailableStorage(); boolean hasDefaultSavepointLocation(); CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) throws IOException; CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException; CheckpointStorageLocation initializeLocationForSavepoint( long checkpointId, @Nullable String externalLocationPointer) throws IOException; CheckpointStreamFactory resolveCheckpointStorageLocation( long checkpointId, CheckpointStorageLocationReference reference) throws IOException; CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException; } 复制代码
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/filesystem/AbstractFsCheckpointStorage.java
/** * An implementation of durable checkpoint storage to file systems. */ public abstract class AbstractFsCheckpointStorage implements CheckpointStorage { // ------------------------------------------------------------------------ // Constants // ------------------------------------------------------------------------ /** The prefix of the directory containing the data exclusive to a checkpoint. */ public static final String CHECKPOINT_DIR_PREFIX = "chk-"; /** The name of the directory for shared checkpoint state. */ public static final String CHECKPOINT_SHARED_STATE_DIR = "shared"; /** The name of the directory for state not owned/released by the master, but by the TaskManagers. */ public static final String CHECKPOINT_TASK_OWNED_STATE_DIR = "taskowned"; /** The name of the metadata files in checkpoints / savepoints. */ public static final String METADATA_FILE_NAME = "_metadata"; /** The magic number that is put in front of any reference. */ private static final byte[] REFERENCE_MAGIC_NUMBER = new byte[] { 0x05, 0x5F, 0x3F, 0x18 }; // ------------------------------------------------------------------------ // Fields and properties // ------------------------------------------------------------------------ /** The jobId, written into the generated savepoint directories. */ private final JobID jobId; /** The default location for savepoints. Null, if none is configured. */ @Nullable private final Path defaultSavepointDirectory; @Override public boolean hasDefaultSavepointLocation() { return defaultSavepointDirectory != null; } @Override public CompletedCheckpointStorageLocation resolveCheckpoint(String checkpointPointer) throws IOException { return resolveCheckpointPointer(checkpointPointer); } /** * Creates a file system based storage location for a savepoint. * * <p>This methods implements the logic that decides which location to use (given optional * parameters for a configured location and a location passed for this specific savepoint) * and how to name and initialize the savepoint directory. * * @param externalLocationPointer The target location pointer for the savepoint. * Must be a valid URI. Null, if not supplied. * @param checkpointId The checkpoint ID of the savepoint. * * @return The checkpoint storage location for the savepoint. * * @throws IOException Thrown if the target directory could not be created. */ @Override public CheckpointStorageLocation initializeLocationForSavepoint( @SuppressWarnings("unused") long checkpointId, @Nullable String externalLocationPointer) throws IOException { // determine where to write the savepoint to final Path savepointBasePath; if (externalLocationPointer != null) { savepointBasePath = new Path(externalLocationPointer); } else if (defaultSavepointDirectory != null) { savepointBasePath = defaultSavepointDirectory; } else { throw new IllegalArgumentException("No savepoint location given and no default location configured."); } // generate the savepoint directory final FileSystem fs = savepointBasePath.getFileSystem(); final String prefix = "savepoint-" + jobId.toString().substring(0, 6) + '-'; Exception latestException = null; for (int attempt = 0; attempt < 10; attempt++) { final Path path = new Path(savepointBasePath, FileUtils.getRandomFilename(prefix)); try { if (fs.mkdirs(path)) { // we make the path qualified, to make it independent of default schemes and authorities final Path qp = path.makeQualified(fs); return createSavepointLocation(fs, qp); } } catch (Exception e) { latestException = e; } } throw new IOException("Failed to create savepoint directory at " + savepointBasePath, latestException); } protected abstract CheckpointStorageLocation createSavepointLocation(FileSystem fs, Path location) throws IOException; //...... } 复制代码
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
/** * An implementation of durable checkpoint storage to file systems. */ public class FsCheckpointStorage extends AbstractFsCheckpointStorage { private final FileSystem fileSystem; private final Path checkpointsDirectory; private final Path sharedStateDirectory; private final Path taskOwnedStateDirectory; private final int fileSizeThreshold; public FsCheckpointStorage( Path checkpointBaseDirectory, @Nullable Path defaultSavepointDirectory, JobID jobId, int fileSizeThreshold) throws IOException { this(checkpointBaseDirectory.getFileSystem(), checkpointBaseDirectory, defaultSavepointDirectory, jobId, fileSizeThreshold); } public FsCheckpointStorage( FileSystem fs, Path checkpointBaseDirectory, @Nullable Path defaultSavepointDirectory, JobID jobId, int fileSizeThreshold) throws IOException { super(jobId, defaultSavepointDirectory); checkArgument(fileSizeThreshold >= 0); this.fileSystem = checkNotNull(fs); this.checkpointsDirectory = getCheckpointDirectoryForJob(checkpointBaseDirectory, jobId); this.sharedStateDirectory = new Path(checkpointsDirectory, CHECKPOINT_SHARED_STATE_DIR); this.taskOwnedStateDirectory = new Path(checkpointsDirectory, CHECKPOINT_TASK_OWNED_STATE_DIR); this.fileSizeThreshold = fileSizeThreshold; // initialize the dedicated directories fileSystem.mkdirs(checkpointsDirectory); fileSystem.mkdirs(sharedStateDirectory); fileSystem.mkdirs(taskOwnedStateDirectory); } // ------------------------------------------------------------------------ public Path getCheckpointsDirectory() { return checkpointsDirectory; } // ------------------------------------------------------------------------ // CheckpointStorage implementation // ------------------------------------------------------------------------ @Override public boolean supportsHighlyAvailableStorage() { return true; } @Override public CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException { checkArgument(checkpointId >= 0); // prepare all the paths needed for the checkpoints final Path checkpointDir = createCheckpointDirectory(checkpointsDirectory, checkpointId); // create the checkpoint exclusive directory fileSystem.mkdirs(checkpointDir); return new FsCheckpointStorageLocation( fileSystem, checkpointDir, sharedStateDirectory, taskOwnedStateDirectory, CheckpointStorageLocationReference.getDefault(), fileSizeThreshold); } @Override public CheckpointStreamFactory resolveCheckpointStorageLocation( long checkpointId, CheckpointStorageLocationReference reference) throws IOException { if (reference.isDefaultReference()) { // default reference, construct the default location for that particular checkpoint final Path checkpointDir = createCheckpointDirectory(checkpointsDirectory, checkpointId); return new FsCheckpointStorageLocation( fileSystem, checkpointDir, sharedStateDirectory, taskOwnedStateDirectory, reference, fileSizeThreshold); } else { // location encoded in the reference final Path path = decodePathFromReference(reference); return new FsCheckpointStorageLocation( path.getFileSystem(), path, path, path, reference, fileSizeThreshold); } } @Override public CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException { return new FsCheckpointStateOutputStream( taskOwnedStateDirectory, fileSystem, FsCheckpointStreamFactory.DEFAULT_WRITE_BUFFER_SIZE, fileSizeThreshold); } @Override protected CheckpointStorageLocation createSavepointLocation(FileSystem fs, Path location) throws IOException { final CheckpointStorageLocationReference reference = encodePathAsReference(location); return new FsCheckpointStorageLocation(fs, location, location, location, reference, fileSizeThreshold); } } 复制代码