/*
 * Decompiled with CFR 0.152.
 */
package org.apache.atlas.repository.store.graph.v2.bulkimport.pc;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.atlas.GraphTransactionInterceptor;
import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.pc.WorkItemConsumer;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStreamForImport;
import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.commons.collections.MapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EntityConsumer
extends WorkItemConsumer<AtlasEntity.AtlasEntityWithExtInfo> {
    private static final Logger LOG = LoggerFactory.getLogger(EntityConsumer.class);
    private static final int MAX_COMMIT_RETRY_COUNT = 3;
    private final int batchSize;
    private AtomicLong counter = new AtomicLong(1L);
    private AtomicLong currentBatch = new AtomicLong(1L);
    private AtlasGraph atlasGraph;
    private final AtlasEntityStore entityStore;
    private final AtlasGraph atlasGraphBulk;
    private final AtlasEntityStore entityStoreBulk;
    private final AtlasTypeRegistry typeRegistry;
    private final EntityGraphRetriever entityRetrieverBulk;
    private List<AtlasEntity.AtlasEntityWithExtInfo> entityBuffer = new ArrayList<AtlasEntity.AtlasEntityWithExtInfo>();
    private List<String> localResults = new ArrayList<String>();

    public EntityConsumer(AtlasTypeRegistry typeRegistry, AtlasGraph atlasGraph, AtlasEntityStore entityStore, AtlasGraph atlasGraphBulk, AtlasEntityStore entityStoreBulk, EntityGraphRetriever entityRetrieverBulk, BlockingQueue queue, int batchSize) {
        super(queue);
        this.typeRegistry = typeRegistry;
        this.atlasGraph = atlasGraph;
        this.entityStore = entityStore;
        this.atlasGraphBulk = atlasGraphBulk;
        this.entityStoreBulk = entityStoreBulk;
        this.entityRetrieverBulk = entityRetrieverBulk;
        this.batchSize = batchSize;
    }

    protected void processItem(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) {
        int delta = MapUtils.isEmpty((Map)entityWithExtInfo.getReferredEntities()) ? 1 : entityWithExtInfo.getReferredEntities().size() + 1;
        long currentCount = this.counter.addAndGet(delta);
        this.currentBatch.addAndGet(delta);
        try {
            this.processEntity(entityWithExtInfo, currentCount);
            this.attemptCommit();
        }
        catch (Exception e) {
            LOG.info("Invalid entities. Possible data loss: Please correct and re-submit!", (Throwable)e);
        }
    }

    private void processEntity(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo, long currentCount) {
        RequestContext.get().setImportInProgress(true);
        RequestContext.get().setCreateShellEntityForNonExistingReference(true);
        try {
            LOG.debug("Processing: {}", (Object)currentCount);
            this.importUsingBulkEntityStore(entityWithExtInfo);
        }
        catch (IllegalArgumentException | IllegalStateException e) {
            LOG.warn("{}: {} - {}", new Object[]{e.getClass().getSimpleName(), entityWithExtInfo.getEntity().getTypeName(), entityWithExtInfo.getEntity().getGuid(), e});
            this.importUsingRegularEntityStore(entityWithExtInfo, e);
        }
        catch (AtlasBaseException e) {
            LOG.warn("AtlasBaseException: {} - {}", new Object[]{entityWithExtInfo.getEntity().getTypeName(), entityWithExtInfo.getEntity().getGuid(), e});
        }
        catch (AtlasSchemaViolationException e) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Entity: {}", (Object)entityWithExtInfo.getEntity().getGuid(), (Object)e);
            }
            BulkImporterImpl.updateVertexGuid(this.atlasGraphBulk, this.typeRegistry, this.entityRetrieverBulk, entityWithExtInfo.getEntity());
        }
    }

    private void importUsingBulkEntityStore(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) throws AtlasBaseException {
        AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, null);
        EntityMutationResponse result = this.entityStoreBulk.createOrUpdateForImportNoCommit(oneEntityStream);
        this.localResults.add(entityWithExtInfo.getEntity().getGuid());
        this.entityBuffer.add(entityWithExtInfo);
    }

    private void importUsingRegularEntityStore(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo, Exception ex) {
        this.commitValidatedEntities(ex);
        this.performRegularImport(entityWithExtInfo);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void performRegularImport(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) {
        AtlasGraph atlasGraph = this.atlasGraph;
        synchronized (atlasGraph) {
            try {
                LOG.info("Regular: EntityStore: {}: Starting...", (Object)this.counter.get());
                AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, null);
                this.entityStore.createOrUpdateForImportNoCommit(oneEntityStream);
                this.atlasGraph.commit();
                this.localResults.add(entityWithExtInfo.getEntity().getGuid());
                this.dispatchResults();
            }
            catch (Exception e) {
                this.atlasGraph.rollback();
                LOG.error("Regular: EntityStore: Rollback!: Entity creation using regular (non-bulk) failed! Please correct entity and re-submit!", (Throwable)e);
            }
            finally {
                LOG.info("Regular: EntityStore: {}: Commit: Done!", (Object)this.counter.get());
                this.atlasGraph.commit();
                this.addResult(entityWithExtInfo.getEntity().getGuid());
                this.clear();
                LOG.info("Regular: EntityStore: {}: Done!", (Object)this.counter.get());
            }
        }
    }

    private void commitValidatedEntities(Exception ex) {
        try {
            LOG.info("Validated Entities: Commit: Starting...");
            this.rollbackPauseRetry(1, ex);
            this.doCommit();
        }
        finally {
            LOG.info("Validated Entities: Commit: Done!");
        }
    }

    private void attemptCommit() {
        if (this.currentBatch.get() < (long)this.batchSize) {
            return;
        }
        this.doCommit();
    }

    protected void doCommit() {
        for (int retryCount = 1; retryCount <= 3; ++retryCount) {
            if (!this.commitWithRetry(retryCount)) continue;
            return;
        }
        LOG.error("Retries exceeded! Potential data loss! Please correct data and re-attempt. Buffer: {}: Counter: {}", (Object)this.entityBuffer.size(), (Object)this.counter.get());
        this.clear();
    }

    protected void commitDirty() {
        super.commitDirty();
        LOG.info("Total: Commit: {}", (Object)this.counter.get());
        this.counter.set(0L);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean commitWithRetry(int retryCount) {
        AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("commitWithRetry");
        try {
            this.atlasGraphBulk.commit();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Commit: Done!: Buffer: {}: Batch: {}: Counter: {}", new Object[]{this.entityBuffer.size(), this.currentBatch.get(), this.counter.get()});
            }
            this.dispatchResults();
            boolean bl = true;
            return bl;
        }
        catch (Exception ex) {
            this.rollbackPauseRetry(retryCount, ex);
            boolean bl = false;
            return bl;
        }
        finally {
            RequestContext.get().endMetricRecord(metric);
        }
    }

    private void rollbackPauseRetry(int retryCount, Exception ex) {
        this.bulkGraphRollback(retryCount);
        LOG.warn("Rollback: Done! Buffer: {}: Counter: {}: Retry count: {}", new Object[]{this.entityBuffer.size(), this.counter.get(), retryCount});
        this.pause(retryCount);
        String exceptionClass = ex.getClass().getSimpleName();
        if (!exceptionClass.equals("JanusGraphException") && !exceptionClass.equals("PermanentLockingException")) {
            LOG.warn("Commit error! Will pause and retry: Buffer: {}: Counter: {}: Retry count: {}", new Object[]{this.entityBuffer.size(), this.counter.get(), retryCount, ex});
        }
        this.retryProcessEntity(retryCount);
    }

    private void bulkGraphRollback(int retryCount) {
        try {
            this.atlasGraphBulk.rollback();
            this.clearCache();
        }
        catch (Exception e) {
            LOG.error("Rollback: Exception! Buffer: {}: Counter: {}: Retry count: {}", new Object[]{this.entityBuffer.size(), this.counter.get(), retryCount});
        }
    }

    private void retryProcessEntity(int retryCount) {
        if (LOG.isDebugEnabled() || retryCount > 1) {
            LOG.info("Replaying: Starting!: Buffer: {}: Retry count: {}", (Object)this.entityBuffer.size(), (Object)retryCount);
        }
        ArrayList<AtlasEntity.AtlasEntityWithExtInfo> localBuffer = new ArrayList<AtlasEntity.AtlasEntityWithExtInfo>(this.entityBuffer);
        this.entityBuffer.clear();
        for (AtlasEntity.AtlasEntityWithExtInfo e : localBuffer) {
            this.processEntity(e, this.counter.get());
        }
        LOG.info("Replaying: Done!: Buffer: {}: Retry count: {}", (Object)this.entityBuffer.size(), (Object)retryCount);
    }

    private void dispatchResults() {
        this.localResults.stream().forEach(x -> this.addResult(x));
        this.clear();
    }

    private void pause(int retryCount) {
        try {
            Thread.sleep(1000 * retryCount);
        }
        catch (InterruptedException e) {
            LOG.error("pause: Interrupted!", (Throwable)e);
        }
    }

    private void clear() {
        this.localResults.clear();
        this.entityBuffer.clear();
        this.clearCache();
        this.currentBatch.set(0L);
    }

    private void clearCache() {
        GraphTransactionInterceptor.clearCache();
        RequestContext.get().clearCache();
    }
}

