package org.apache.flink.runtime.blob;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/blob/PermanentBlobCache.class */
public class PermanentBlobCache extends AbstractBlobCache implements PermanentBlobService {
    private final Map<JobID, RefCount> jobRefCounters;
    private final long cleanupInterval;
    private final Timer cleanupTimer;

    /* loaded from: input_file:org/apache/flink/runtime/blob/PermanentBlobCache$PermanentBlobCleanupTask.class */
    class PermanentBlobCleanupTask extends TimerTask {
        PermanentBlobCleanupTask() {
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            synchronized (PermanentBlobCache.this.jobRefCounters) {
                Iterator it = PermanentBlobCache.this.jobRefCounters.entrySet().iterator();
                long currentTimeMillis = System.currentTimeMillis();
                while (it.hasNext()) {
                    Map.Entry entry = (Map.Entry) it.next();
                    RefCount refCount = (RefCount) entry.getValue();
                    if (refCount.references <= 0 && refCount.keepUntil > 0 && currentTimeMillis >= refCount.keepUntil) {
                        File file = new File(BlobUtils.getStorageLocationPath(PermanentBlobCache.this.storageDir.getAbsolutePath(), (JobID) entry.getKey()));
                        PermanentBlobCache.this.readWriteLock.writeLock().lock();
                        boolean z = false;
                        try {
                            try {
                                FileUtils.deleteDirectory(file);
                                z = true;
                                PermanentBlobCache.this.readWriteLock.writeLock().unlock();
                            } catch (Throwable th) {
                                PermanentBlobCache.this.log.warn("Failed to locally delete job directory " + file.getAbsolutePath(), th);
                                PermanentBlobCache.this.readWriteLock.writeLock().unlock();
                            }
                            if (z) {
                                it.remove();
                            }
                        } catch (Throwable th2) {
                            PermanentBlobCache.this.readWriteLock.writeLock().unlock();
                            throw th2;
                        }
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/flink/runtime/blob/PermanentBlobCache$RefCount.class */
    public static class RefCount {
        public int references = 0;
        public long keepUntil = -1;

        RefCount() {
        }
    }

    public PermanentBlobCache(Configuration configuration, BlobView blobView, @Nullable InetSocketAddress inetSocketAddress) throws IOException {
        super(configuration, blobView, LoggerFactory.getLogger((Class<?>) PermanentBlobCache.class), inetSocketAddress);
        this.jobRefCounters = new HashMap();
        this.cleanupTimer = new Timer(true);
        this.cleanupInterval = configuration.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000;
        this.cleanupTimer.schedule(new PermanentBlobCleanupTask(), this.cleanupInterval, this.cleanupInterval);
    }

    public void registerJob(JobID jobID) {
        Preconditions.checkNotNull(jobID);
        synchronized (this.jobRefCounters) {
            RefCount refCount = this.jobRefCounters.get(jobID);
            if (refCount == null) {
                refCount = new RefCount();
                this.jobRefCounters.put(jobID, refCount);
            } else {
                refCount.keepUntil = -1L;
            }
            refCount.references++;
        }
    }

    public void releaseJob(JobID jobID) {
        Preconditions.checkNotNull(jobID);
        synchronized (this.jobRefCounters) {
            RefCount refCount = this.jobRefCounters.get(jobID);
            if (refCount == null || refCount.references == 0) {
                this.log.warn("improper use of releaseJob() without a matching number of registerJob() calls for jobId " + jobID);
                return;
            }
            refCount.references--;
            if (refCount.references == 0) {
                refCount.keepUntil = System.currentTimeMillis() + this.cleanupInterval;
            }
        }
    }

    public int getNumberOfReferenceHolders(JobID jobID) {
        Preconditions.checkNotNull(jobID);
        synchronized (this.jobRefCounters) {
            RefCount refCount = this.jobRefCounters.get(jobID);
            if (refCount == null) {
                return 0;
            }
            return refCount.references;
        }
    }

    @Override // org.apache.flink.runtime.blob.PermanentBlobService
    public File getFile(JobID jobID, PermanentBlobKey permanentBlobKey) throws IOException {
        Preconditions.checkNotNull(jobID);
        return getFileInternal(jobID, permanentBlobKey);
    }

    @VisibleForTesting
    public File getStorageLocation(JobID jobID, BlobKey blobKey) throws IOException {
        Preconditions.checkNotNull(jobID);
        return BlobUtils.getStorageLocation(this.storageDir, jobID, blobKey);
    }

    @VisibleForTesting
    Map<JobID, RefCount> getJobRefCounters() {
        return this.jobRefCounters;
    }

    @Override // org.apache.flink.runtime.blob.AbstractBlobCache
    protected void cancelCleanupTask() {
        this.cleanupTimer.cancel();
    }
}
