EntandoBundleInstallService.java
package org.entando.kubernetes.service.digitalexchange.job;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.nio.file.Path;
import java.util.ArrayDeque;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.entando.kubernetes.client.model.AnalysisReport;
import org.entando.kubernetes.client.model.assembler.InstallPlanAssembler;
import org.entando.kubernetes.controller.digitalexchange.job.model.InstallAction;
import org.entando.kubernetes.controller.digitalexchange.job.model.InstallPlan;
import org.entando.kubernetes.exception.EntandoComponentManagerException;
import org.entando.kubernetes.exception.digitalexchange.ReportAnalysisException;
import org.entando.kubernetes.model.bundle.ComponentType;
import org.entando.kubernetes.model.bundle.descriptor.Descriptor;
import org.entando.kubernetes.model.bundle.downloader.BundleDownloader;
import org.entando.kubernetes.model.bundle.downloader.BundleDownloaderFactory;
import org.entando.kubernetes.model.bundle.installable.Installable;
import org.entando.kubernetes.model.bundle.processor.ComponentProcessor;
import org.entando.kubernetes.model.bundle.reader.BundleReader;
import org.entando.kubernetes.model.bundle.reportable.AnalysisReportFunction;
import org.entando.kubernetes.model.bundle.reportable.Reportable;
import org.entando.kubernetes.model.bundle.reportable.ReportableComponentProcessor;
import org.entando.kubernetes.model.bundle.reportable.ReportableRemoteHandler;
import org.entando.kubernetes.model.debundle.EntandoDeBundle;
import org.entando.kubernetes.model.debundle.EntandoDeBundleTag;
import org.entando.kubernetes.model.job.EntandoBundleComponentJobEntity;
import org.entando.kubernetes.model.job.EntandoBundleEntity;
import org.entando.kubernetes.model.job.EntandoBundleJobEntity;
import org.entando.kubernetes.model.job.JobProgress;
import org.entando.kubernetes.model.job.JobResult;
import org.entando.kubernetes.model.job.JobScheduler;
import org.entando.kubernetes.model.job.JobStatus;
import org.entando.kubernetes.model.job.JobTracker;
import org.entando.kubernetes.repository.EntandoBundleComponentJobRepository;
import org.entando.kubernetes.repository.EntandoBundleJobRepository;
import org.entando.kubernetes.repository.InstalledEntandoBundleRepository;
import org.entando.kubernetes.service.digitalexchange.BundleUtilities;
import org.entando.kubernetes.service.digitalexchange.component.EntandoBundleService;
import org.entando.kubernetes.service.digitalexchange.concurrency.BundleOperationsConcurrencyManager;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RequiredArgsConstructor
public class EntandoBundleInstallService implements EntandoBundleJobExecutor {
public static final boolean PERFORM_CONCURRENT_CHECKS = true;
public static final boolean DONT_PERFORM_CONCURRENT_CHECKS = false;
private final @NonNull EntandoBundleService bundleService;
private final @NonNull BundleDownloaderFactory downloaderFactory;
private final @NonNull EntandoBundleJobRepository jobRepo;
private final @NonNull EntandoBundleComponentJobRepository compJobRepo;
private final @NonNull InstalledEntandoBundleRepository bundleRepository;
private final @NonNull Map<ComponentType, ComponentProcessor<?>> processorMap;
private final @NonNull List<ReportableComponentProcessor> reportableComponentProcessorList;
private final @NonNull Map<ReportableRemoteHandler, AnalysisReportFunction> analysisReportStrategies;
private final @NonNull BundleOperationsConcurrencyManager bundleOperationsConcurrencyManager;
private final ObjectMapper objectMapper = new ObjectMapper();
/**
* perform the install analysis if there isn't another running bundle operation.
* return the InstallPlan generated by the result of the analysis
*
* @param bundle the bundle to analyze
* @param tag the tag of the bundle to analyze
* @param performConcurrencyCheck if true it check for possible concurrent operations
* @return the generated {@link InstallPlan}
*/
public InstallPlan generateInstallPlan(EntandoDeBundle bundle, EntandoDeBundleTag tag,
boolean performConcurrencyCheck) {
if (performConcurrencyCheck) {
this.bundleOperationsConcurrencyManager.throwIfAnotherOperationIsRunningOrStartOperation();
}
InstallPlan installPlan;
BundleDownloader bundleDownloader = downloaderFactory.newDownloader();
try {
BundleReader bundleReader = this.downloadBundleAndGetBundleReader(bundleDownloader, bundle, tag);
Map<ReportableRemoteHandler, List<Reportable>> reportableByHandler =
this.getReportableComponentsByRemoteHandler(bundleReader);
List<CompletableFuture<AnalysisReport>> futureList = reportableByHandler.keySet().stream()
// for each remote handler => get whole analysis report async
.map(key -> CompletableFuture.supplyAsync(() -> analysisReportStrategies.get(key)
.getAnalysisReport(reportableByHandler.get(key))))
.collect(Collectors.toList());
// why using separate streams https://stackoverflow.com/questions/58700578/why-is-completablefuture-join-get-faster-in-separate-streams-than-using-one-stre
try {
installPlan = futureList.stream().map(CompletableFuture::join)
.map(InstallPlanAssembler::toInstallPlan)
.reduce(InstallPlan::merge)
.orElseThrow(() -> new ReportAnalysisException(String.format(
"An error occurred during the install plan generation for the bundle %s with tag %s",
bundle.getMetadata().getName(), tag.getVersion())));
} catch (CompletionException e) {
throw e.getCause() instanceof ReportAnalysisException
? (ReportAnalysisException) e.getCause()
: e;
}
} finally {
if (performConcurrencyCheck) {
this.bundleOperationsConcurrencyManager.operationTerminated();
}
bundleDownloader.cleanTargetDirectory();
}
return installPlan;
}
public EntandoBundleJobEntity install(EntandoDeBundle bundle, EntandoDeBundleTag tag) {
return this.install(bundle, tag, InstallAction.CREATE);
}
public EntandoBundleJobEntity install(EntandoDeBundle bundle, EntandoDeBundleTag tag,
InstallAction conflictStrategy) {
this.bundleOperationsConcurrencyManager.throwIfAnotherOperationIsRunningOrStartOperation();
try {
// Only request analysis report if provided conflict strategy
final InstallPlan installPlan = conflictStrategy != InstallAction.CREATE
? generateInstallPlan(bundle, tag, EntandoBundleInstallService.DONT_PERFORM_CONCURRENT_CHECKS)
: new InstallPlan();
EntandoBundleJobEntity job = createInstallJob(bundle, tag, installPlan);
submitInstallAsync(job, bundle, tag, conflictStrategy, installPlan)
.thenAccept(unused -> this.bundleOperationsConcurrencyManager.operationTerminated());
return job;
} catch (Exception e) {
// release concurrency manager's lock
this.bundleOperationsConcurrencyManager.operationTerminated();
throw e;
}
}
public EntandoBundleJobEntity installWithInstallPlan(EntandoDeBundle bundle, EntandoDeBundleTag tag,
InstallPlan installPlan) {
this.bundleOperationsConcurrencyManager.throwIfAnotherOperationIsRunningOrStartOperation();
try {
EntandoBundleJobEntity job = createInstallJob(bundle, tag, installPlan);
submitInstallAsync(job, bundle, tag, InstallAction.CREATE, installPlan)
.thenAccept(unused -> this.bundleOperationsConcurrencyManager.operationTerminated());
return job;
} catch (Exception e) {
// release concurrency manager's lock
this.bundleOperationsConcurrencyManager.operationTerminated();
throw e;
}
}
private EntandoBundleJobEntity createInstallJob(EntandoDeBundle bundle, EntandoDeBundleTag tag,
InstallPlan installPlan) {
final EntandoBundleJobEntity job = new EntandoBundleJobEntity();
job.setComponentId(bundle.getMetadata().getName());
job.setComponentName(bundle.getSpec().getDetails().getName());
job.setComponentVersion(tag.getVersion());
job.setProgress(0);
job.setStatus(JobStatus.INSTALL_CREATED);
if (installPlan != null) {
job.setCustomInstallation(installPlan.isCustomInstallation());
try {
job.setInstallPlan(objectMapper.writeValueAsString(installPlan));
} catch (JsonProcessingException e) {
log.error("Error converting the received install plan to string", e);
job.setInstallPlan(null);
}
}
EntandoBundleJobEntity createdJob = jobRepo.save(job);
log.debug("New installation job created " + job.toString());
createdJob.getComponentId();
return createdJob;
}
private CompletableFuture<Void> submitInstallAsync(EntandoBundleJobEntity parentJob, EntandoDeBundle bundle,
EntandoDeBundleTag tag, InstallAction conflictStrategy, InstallPlan installPlan) {
return CompletableFuture.runAsync(() -> {
log.info("Started new install job for component " + parentJob.getComponentId() + "@" + tag.getVersion());
JobTracker<EntandoBundleJobEntity> parentJobTracker = new JobTracker<>(parentJob, jobRepo);
JobResult parentJobResult = JobResult.builder().build();
JobScheduler scheduler = new JobScheduler();
BundleDownloader bundleDownloader = downloaderFactory.newDownloader();
parentJobTracker.startTracking(JobStatus.INSTALL_IN_PROGRESS);
try {
Queue<Installable> bundleInstallableComponents = getBundleInstallableComponents(bundle, tag,
bundleDownloader, conflictStrategy, installPlan);
Queue<EntandoBundleComponentJobEntity> componentJobQueue = bundleInstallableComponents.stream()
.map(i -> {
EntandoBundleComponentJobEntity cj = new EntandoBundleComponentJobEntity();
cj.setParentJob(parentJob);
cj.setComponentType(i.getComponentType());
cj.setComponentId(i.getName());
cj.setChecksum(i.getChecksum());
cj.setInstallable(i);
cj.setAction(i.getAction());
return cj;
})
.collect(Collectors.toCollection(ArrayDeque::new));
scheduler.queueAll(componentJobQueue);
JobProgress installProgress = new JobProgress(1.0 / componentJobQueue.size());
Optional<EntandoBundleComponentJobEntity> optCompJob = scheduler.extractFromQueue();
while (optCompJob.isPresent()) {
EntandoBundleComponentJobEntity installJob = optCompJob.get();
JobTracker<EntandoBundleComponentJobEntity> tracker = trackExecution(installJob,
this::executeInstall);
scheduler.recordProcessedComponentJob(tracker.getJob());
if (tracker.getJob().getStatus().equals(JobStatus.INSTALL_ERROR)) {
parentJobResult.setInstallException(new EntandoComponentManagerException(
tracker.getJob().getInstallErrorMessage()));
break;
}
installProgress.increment();
parentJobTracker.setProgress(installProgress.getValue());
optCompJob = scheduler.extractFromQueue();
}
if (parentJobResult.hasException()) {
log.error("An error occurred during component instEntandoBundleJobEntityallation --- ", parentJobResult.getInstallError());
log.warn("Rolling installation of bundle " + parentJob.getComponentId() + "@" + parentJob
.getComponentVersion());
parentJobResult = rollback(scheduler, parentJobResult);
} else {
saveAsInstalledBundle(bundle, parentJob);
parentJobResult.clearException();
parentJobResult.setStatus(JobStatus.INSTALL_COMPLETED);
parentJobResult.setProgress(1.0);
log.info("Bundle installed correctly");
}
} catch (Exception e) {
log.error("An error occurred while reading components from the bundle", e);
parentJobResult.setStatus(JobStatus.INSTALL_ERROR);
parentJobResult.setInstallException(e);
}
parentJobTracker.finishTracking(parentJobResult);
bundleDownloader.cleanTargetDirectory();
});
}
private JobResult rollback(JobScheduler scheduler, JobResult result) {
JobScheduler rollbackScheduler = scheduler.createRollbackScheduler();
try {
Optional<EntandoBundleComponentJobEntity> optCompJob = rollbackScheduler.extractFromQueue();
while (optCompJob.isPresent()) {
EntandoBundleComponentJobEntity rollbackJob = optCompJob.get();
if (isUninstallable(rollbackJob)) {
JobTracker<EntandoBundleComponentJobEntity> tracker = trackExecution(rollbackJob,
this::executeRollback);
if (tracker.getJob().getStatus().equals(JobStatus.INSTALL_ROLLBACK_ERROR)) {
throw new EntandoComponentManagerException(
rollbackJob.getComponentType() + " " + rollbackJob.getComponentId()
+ " rollback can't proceed due to an error with one of the components");
}
rollbackScheduler.recordProcessedComponentJob(tracker.getJob());
}
optCompJob = rollbackScheduler.extractFromQueue();
}
log.info("Rollback operation completed successfully");
result.setStatus(JobStatus.INSTALL_ROLLBACK);
} catch (Exception rollbackException) {
log.error("An error occurred during component rollback", rollbackException);
result.setStatus(JobStatus.INSTALL_ERROR);
result.setRollbackException(rollbackException);
}
return result;
}
/**
* download the bundle, create a BundleReader to read it and return it.
*
* @param bundleDownloader the BundleDownloader responsible to download the desired bundle
* @param bundle the object defining the bundle to download
* @param tag the object defining the version of the bundle to download
* @return the created BundleReader ready to read the bundle
*/
private BundleReader downloadBundleAndGetBundleReader(BundleDownloader bundleDownloader, EntandoDeBundle bundle,
EntandoDeBundleTag tag) {
Path pathToDownloadedBundle = bundleDownloader.saveBundleLocally(bundle, tag);
return new BundleReader(pathToDownloadedBundle);
}
private Queue<Installable> getBundleInstallableComponents(EntandoDeBundle bundle, EntandoDeBundleTag tag,
BundleDownloader bundleDownloader, InstallAction conflictStrategy, InstallPlan installPlan) {
BundleReader bundleReader = this.downloadBundleAndGetBundleReader(bundleDownloader, bundle, tag);
return getInstallableComponentsByPriority(bundleReader, conflictStrategy, installPlan);
}
private JobTracker<EntandoBundleComponentJobEntity> trackExecution(EntandoBundleComponentJobEntity job,
Function<Installable, JobResult> action) {
JobTracker<EntandoBundleComponentJobEntity> componentJobTracker = new JobTracker<>(job, compJobRepo);
componentJobTracker.startTracking(JobStatus.INSTALL_IN_PROGRESS);
JobResult result = action.apply(job.getInstallable());
componentJobTracker.finishTracking(result);
return componentJobTracker;
}
/**
* execute every ReportableProcessor to extract the relative Reportable from the descriptor and return it.
*
* @param bundleReader the BUndleReader to use to read the bundle
* @return a List of Reportable extracted from the bundle components descriptors
*/
private Map<ReportableRemoteHandler, List<Reportable>> getReportableComponentsByRemoteHandler(
BundleReader bundleReader) {
return reportableComponentProcessorList.stream()
.map(reportableProcessor ->
reportableProcessor.getReportable(bundleReader, (ComponentProcessor<?>) reportableProcessor))
.collect(Collectors.groupingBy(Reportable::getReportableRemoteHandler));
}
private Queue<Installable> getInstallableComponentsByPriority(BundleReader bundleReader,
InstallAction conflictStrategy, InstallPlan installPlan) {
return processorMap.values().stream()
.map(processor -> processor.process(bundleReader, conflictStrategy, installPlan))
.flatMap(List::stream)
.sorted(Comparator.comparingInt(Installable::getPriority))
.collect(Collectors.toCollection(ArrayDeque::new));
}
private void saveAsInstalledBundle(EntandoDeBundle bundle, EntandoBundleJobEntity job) {
EntandoBundleEntity installedComponent = bundleService.convertToEntityFromEcr(bundle);
installedComponent.setVersion(job.getComponentVersion());
installedComponent.setJob(job);
installedComponent.setBundleType(BundleUtilities.extractBundleTypeFromBundle(bundle).toString());
installedComponent.setInstalled(true);
bundleRepository.save(installedComponent);
log.info("Component " + job.getComponentId() + " registered as installed in the system");
}
private boolean isUninstallable(EntandoBundleComponentJobEntity component) {
/* TODO: related to ENG-415 (https://jira.entando.org/browse/ENG-415)
Except for IN_PROGRESS, everything should be uninstallable
Uninstall operations should be idempotent to be able to provide this
*/
return component.getStatus().equals(JobStatus.INSTALL_COMPLETED)
|| (component.getStatus().equals(JobStatus.INSTALL_ERROR)
&& component.getComponentType() == ComponentType.PLUGIN);
}
private JobResult executeRollback(Installable<?> installable) {
return installable.uninstall()
.thenApply(vd -> JobResult.builder().status(JobStatus.INSTALL_ROLLBACK).build())
.exceptionally(th -> {
log.error(String.format("Error rolling back %s %s",
installable.getComponentType(),
installable.getName()), th);
String message = getMeaningfulErrorMessage(th, installable);
return JobResult.builder()
.status(JobStatus.INSTALL_ROLLBACK_ERROR)
.rollbackException(new EntandoComponentManagerException(message))
.build();
})
.join();
}
private <T extends Descriptor> JobResult executeInstall(Installable<T> installable) {
CompletableFuture<?> future = installable.install();
CompletableFuture<JobResult> installResult = future
.thenApply(vd -> {
log.debug("Installable '{}' finished successfully", installable.getName());
return JobResult.builder().status(JobStatus.INSTALL_COMPLETED).build();
}).exceptionally(th -> {
String message = getMeaningfulErrorMessage(th, installable);
log.error("Installable '{}' has errors: {}", installable.getName(), message, th);
return JobResult.builder()
.status(JobStatus.INSTALL_ERROR)
.installException(new EntandoComponentManagerException(message))
.build();
});
return installResult.join();
}
}