DefaultK8SServiceClient.java
package org.entando.kubernetes.client.k8ssvc;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.fabric8.kubernetes.api.model.extensions.Ingress;
import io.fabric8.kubernetes.api.model.extensions.IngressRule;
import io.fabric8.kubernetes.client.KubernetesClientException;
import java.net.URI;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.AbstractMap.SimpleEntry;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import org.entando.kubernetes.client.model.AnalysisReport;
import org.entando.kubernetes.controller.digitalexchange.job.model.Status;
import org.entando.kubernetes.exception.k8ssvc.K8SServiceClientException;
import org.entando.kubernetes.model.bundle.ComponentType;
import org.entando.kubernetes.model.bundle.reportable.Reportable;
import org.entando.kubernetes.model.debundle.EntandoDeBundle;
import org.entando.kubernetes.model.link.EntandoAppPluginLink;
import org.entando.kubernetes.model.plugin.EntandoPlugin;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.hateoas.CollectionModel;
import org.springframework.hateoas.EntityModel;
import org.springframework.hateoas.Link;
import org.springframework.hateoas.MediaTypes;
import org.springframework.hateoas.client.Hop;
import org.springframework.hateoas.client.Traverson;
import org.springframework.hateoas.mediatype.hal.Jackson2HalModule;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.RequestEntity;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.ClientHttpRequestFactory;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter;
import org.springframework.security.oauth2.client.OAuth2RestTemplate;
import org.springframework.security.oauth2.client.token.grant.client.ClientCredentialsResourceDetails;
import org.springframework.web.client.RestClientException;
import org.springframework.web.client.RestClientResponseException;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.util.UriComponents;
import org.springframework.web.util.UriComponentsBuilder;
public class DefaultK8SServiceClient implements K8SServiceClient {
public static final String APPS_ENDPOINT = "apps";
public static final String PLUGINS_ENDPOINT = "plugins";
public static final String BUNDLES_ENDPOINT = "bundles";
public static final String APP_PLUGIN_LINKS_ENDPOINT = "app-plugin-links";
public static final String ERROR_RETRIEVING_BUNDLE_WITH_NAME = "An error occurred while retrieving bundle with name ";
private static final Logger LOGGER = Logger.getLogger(DefaultK8SServiceClient.class.getName());
private final String k8sServiceUrl;
private Path tokenFilePath;
private RestTemplate restTemplate;
private RestTemplate noAuthRestTemplate;
private Traverson traverson;
public DefaultK8SServiceClient(String k8sServiceUrl, String tokenFilePath, boolean normalizeK8sServiceUrl) {
this.tokenFilePath = Paths.get(tokenFilePath);
this.restTemplate = newRestTemplate();
if (normalizeK8sServiceUrl && ! k8sServiceUrl.endsWith("/")) {
k8sServiceUrl += "/";
}
this.k8sServiceUrl = k8sServiceUrl;
this.traverson = newTraverson();
this.noAuthRestTemplate = newNoAuthRestTemplate();
}
public Traverson newTraverson() {
return new Traverson(URI.create(k8sServiceUrl), MediaTypes.HAL_JSON, MediaType.APPLICATION_JSON)
.setRestOperations(getRestTemplate());
}
public RestTemplate getRestTemplate() {
return restTemplate;
}
public void setRestTemplate(RestTemplate restTemplate) {
this.restTemplate = restTemplate;
this.traverson = newTraverson();
}
public void setNoAuthRestTemplate(RestTemplate restTemplate) {
this.noAuthRestTemplate = restTemplate;
}
@Override
public List<EntandoAppPluginLink> getAppLinks(String entandoAppName) {
return tryOrThrow(() -> {
CollectionModel<EntityModel<EntandoAppPluginLink>> links = traverson
.follow(APP_PLUGIN_LINKS_ENDPOINT)
.follow(Hop.rel("app-links").withParameter("app", entandoAppName))
.toObject(new ParameterizedTypeReference<CollectionModel<EntityModel<EntandoAppPluginLink>>>() {
});
assert links != null;
return links.getContent().stream()
.map(EntityModel::getContent)
.collect(Collectors.toList());
});
}
@Override
public EntandoPlugin getPluginForLink(EntandoAppPluginLink el) {
return tryOrThrow(() -> traverson.follow(APP_PLUGIN_LINKS_ENDPOINT)
.follow(Hop.rel("app-plugin-link").withParameter("name", el.getMetadata().getName()))
.follow("plugin")
.toObject(new ParameterizedTypeReference<EntityModel<EntandoPlugin>>() {
})
.getContent(), "get plugin associated with link " + el.getMetadata().getName());
}
@Override
public Optional<EntandoPlugin> getPluginByName(String name) {
EntandoPlugin plugin = null;
try {
plugin = traverson.follow(PLUGINS_ENDPOINT)
.follow(Hop.rel("plugin").withParameter("name", name))
.toObject(new ParameterizedTypeReference<EntityModel<EntandoPlugin>>() {
})
.getContent();
} catch (RestClientResponseException ex) {
if (ex.getRawStatusCode() != 404) {
throw new KubernetesClientException("An error occurred while retrieving plugin with name " + name, ex);
}
}
return Optional.ofNullable(plugin);
}
/**
* UPDATES A PLUGIN.
* <p>
* updates a plugin CR given the CR object
* </p>
*/
@Override
public EntandoPlugin updatePlugin(EntandoPlugin plugin) {
String pluginName = plugin.getMetadata().getName();
URI updateURI = traverson.follow(PLUGINS_ENDPOINT)
.follow(Hop.rel("create-or-replace-plugin").withParameter("name", pluginName))
.asLink().toUri();
return tryOrThrow(() -> {
RequestEntity<EntandoPlugin> request = RequestEntity
.put(updateURI)
.contentType(MediaType.APPLICATION_JSON)
.body(plugin);
ResponseEntity<EntityModel<EntandoPlugin>> response = restTemplate
.exchange(request, new ParameterizedTypeReference<EntityModel<EntandoPlugin>>() {
});
if (response.getStatusCode().is2xxSuccessful() && response.getBody() != null) {
return response.getBody().getContent();
} else {
throw new RestClientResponseException("Update process failed",
response.getStatusCodeValue(), response.getStatusCode().getReasonPhrase(),
null, null, null);
}
}, String.format("while updating plugin %s", pluginName));
}
@Override
public void unlink(EntandoAppPluginLink el) {
String linkName = el.getMetadata().getName();
String appName = el.getSpec().getEntandoAppName();
String pluginName = el.getSpec().getEntandoAppNamespace().orElse(el.getMetadata().getNamespace());
Link unlinkHref = traverson.follow(APP_PLUGIN_LINKS_ENDPOINT)
.follow(Hop.rel("app-plugin-link").withParameter("name", linkName))
.asLink();
tryOrThrow(() -> restTemplate.delete(unlinkHref.toUri()),
String.format("unlink app %s and plugin %s", appName, pluginName));
}
@Override
public EntandoAppPluginLink linkAppWithPlugin(String name, String namespace, EntandoPlugin plugin) {
URI linkToCall = tryOrThrow(() -> {
Link linkToApp = traverson.follow("apps")
.follow(Hop.rel("app").withParameter("name", name))
.asLink();
return UriComponentsBuilder.fromUri(linkToApp.toUri()).pathSegment("links").build(Collections.emptyMap());
});
return tryOrThrow(() -> {
RequestEntity<EntandoPlugin> request = RequestEntity
.post(linkToCall)
.contentType(MediaType.APPLICATION_JSON)
.body(plugin);
ResponseEntity<EntityModel<EntandoAppPluginLink>> resp = restTemplate
.exchange(request, new ParameterizedTypeReference<EntityModel<EntandoAppPluginLink>>() {
});
if (resp.getStatusCode().is2xxSuccessful() && resp.getBody() != null) {
return resp.getBody().getContent();
}
throw new RestClientResponseException("Linking process failed",
resp.getStatusCodeValue(), resp.getStatusCode().getReasonPhrase(),
null, null, null);
},
String.format("linking app %s to plugin %s", name, plugin.getMetadata().getName())
);
}
@Override
public Optional<EntandoAppPluginLink> getLinkByName(String linkName) {
EntandoAppPluginLink link = null;
try {
link = traverson.follow(APP_PLUGIN_LINKS_ENDPOINT)
.follow(Hop.rel("app-plugin-link").withParameter("name", linkName))
.toObject(new ParameterizedTypeReference<EntityModel<EntandoAppPluginLink>>() {
})
.getContent();
} catch (RestClientResponseException ex) {
if (ex.getRawStatusCode() != 404) {
throw new KubernetesClientException(
"An error occurred while retrieving entando-app-plugin-link with name " + linkName, ex);
}
}
return Optional.ofNullable(link);
}
@Override
public List<EntandoDeBundle> getBundlesInObservedNamespaces() {
LOGGER.info("### fetching bundles from all namespaces");
return tryOrThrow(() -> traverson.follow(BUNDLES_ENDPOINT)
.toObject(new ParameterizedTypeReference<CollectionModel<EntityModel<EntandoDeBundle>>>() {
})
.getContent()
.stream().map(EntityModel::getContent)
.collect(Collectors.toList()));
}
@Override
public List<EntandoDeBundle> getBundlesInNamespace(String namespace) {
LOGGER.info("### fetching bundles from " + namespace + " namespace");
return tryOrThrow(() -> traverson.follow(Hop.rel(BUNDLES_ENDPOINT).withParameter("namespace", namespace))
.toObject(new ParameterizedTypeReference<CollectionModel<EntityModel<EntandoDeBundle>>>() {
})
.getContent()
.stream().map(EntityModel::getContent)
.collect(Collectors.toList()));
}
@Override
public List<EntandoDeBundle> getBundlesInNamespaces(List<String> namespaces) {
@SuppressWarnings("unchecked")
CompletableFuture<List<EntandoDeBundle>>[] futures = namespaces.stream()
.map(n -> CompletableFuture.supplyAsync(() -> getBundlesInNamespace(n))
.exceptionally(ex -> {
LOGGER.log(Level.SEVERE, "An error occurred while retrieving bundle from a namespace", ex);
return Collections.emptyList();
}))
.toArray(CompletableFuture[]::new);
return CompletableFuture.allOf(futures).thenApply(v -> Arrays.stream(futures)
.map(CompletableFuture::join)
.flatMap(Collection::stream)
.collect(Collectors.toList())
).join();
}
@Override
public Optional<EntandoDeBundle> getBundleWithName(String name) {
EntandoDeBundle bundle = null;
try {
bundle = traverson.follow(BUNDLES_ENDPOINT)
.follow(Hop.rel("bundle").withParameter("name", name))
.toObject(new ParameterizedTypeReference<EntityModel<EntandoDeBundle>>() {
})
.getContent();
} catch (RestClientResponseException ex) {
if (ex.getRawStatusCode() != 404) {
throw new KubernetesClientException(ERROR_RETRIEVING_BUNDLE_WITH_NAME + name, ex);
}
} catch (Exception ex) {
throw new KubernetesClientException(ERROR_RETRIEVING_BUNDLE_WITH_NAME + name, ex);
}
return Optional.ofNullable(bundle);
}
@Override
public Optional<EntandoDeBundle> getBundleWithNameAndNamespace(String name, String namespace) {
EntandoDeBundle bundle = null;
try {
final EntityModel<EntandoDeBundle> entityModel = traverson.follow(BUNDLES_ENDPOINT)
.follow(Hop.rel("bundle")
.withParameter("name", name)
.withParameter("namespace", namespace))
.toObject(new ParameterizedTypeReference<EntityModel<EntandoDeBundle>>() {
});
if (entityModel != null) {
bundle = entityModel.getContent();
}
} catch (RestClientResponseException ex) {
if (ex.getRawStatusCode() != 404) {
throw new KubernetesClientException(ERROR_RETRIEVING_BUNDLE_WITH_NAME + name, ex);
}
} catch (Exception ex) {
throw new KubernetesClientException(ERROR_RETRIEVING_BUNDLE_WITH_NAME + name, ex);
}
return Optional.ofNullable(bundle);
}
@Override
public boolean isPluginReadyToServeApp(EntandoPlugin plugin, String appName) {
if (plugin.getSpec().getIngressPath() == null) {
return false;
}
Ingress appIngress = getAppIngress(appName);
IngressRule ingressRule = appIngress.getSpec().getRules().stream().findFirst().<RuntimeException>orElseThrow(
() -> {
throw new K8SServiceClientException(
"EntandoApp ingress " + appIngress.getMetadata().getName() + " does not have an host");
});
String appHost = ingressRule.getHost();
UriComponents pluginHealthCheck = UriComponentsBuilder.newInstance()
.scheme(appIngress.getSpec().getTls().isEmpty() ? "http" : "https")
.host(appHost)
.path(plugin.getSpec().getIngressPath())
.path(plugin.getSpec().getHealthCheckPath())
.build();
RequestEntity<?> request = RequestEntity
.get(URI.create(pluginHealthCheck.toUriString()))
.accept(MediaType.APPLICATION_JSON)
.build();
try {
ResponseEntity<Object> response = this.noAuthRestTemplate.exchange(request, Object.class);
return response.getStatusCode().is2xxSuccessful();
} catch (RestClientResponseException e) {
HttpStatus status = HttpStatus.valueOf(e.getRawStatusCode());
if (status.equals(HttpStatus.NOT_FOUND) || status.equals(HttpStatus.SERVICE_UNAVAILABLE)) {
return false;
}
throw e;
} catch (RestClientException e) {
throw e;
}
}
@Override
public AnalysisReport getAnalysisReport(List<Reportable> reportableList) {
Map<String, Status> pluginStatusMap = reportableList.stream()
.filter(reportable -> reportable.getComponentType() == ComponentType.PLUGIN)
.flatMap(reportable -> reportable.getCodes().stream())
.map(name ->
getPluginByName(name)
.map(plugin -> new SimpleEntry<>(name, Status.DIFF))
.orElseGet(() -> new SimpleEntry<>(name, Status.NEW)))
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
return new AnalysisReport().setPlugins(pluginStatusMap);
}
private Ingress getAppIngress(String appName) {
return tryOrThrow(() -> traverson.follow(APPS_ENDPOINT)
.follow(Hop.rel("app").withParameter("name", appName))
.follow("app-ingress")
.toObject(new ParameterizedTypeReference<EntityModel<Ingress>>() {
})
.getContent());
}
private RestTemplate newRestTemplate() {
final OAuth2RestTemplate template = new OAuth2RestTemplate(new ClientCredentialsResourceDetails());
template.setRequestFactory(getRequestFactory());
template.setAccessTokenProvider(new FromFileTokenProvider(this.tokenFilePath));
return setMessageConverters(template);
}
private RestTemplate newNoAuthRestTemplate() {
final RestTemplate template = new RestTemplate();
template.setRequestFactory(getRequestFactory());
return setMessageConverters(template);
}
private RestTemplate setMessageConverters(RestTemplate restTemplate) {
List<HttpMessageConverter<?>> messageConverters = Traverson
.getDefaultMessageConverters(MediaType.APPLICATION_JSON, MediaTypes.HAL_JSON);
if (messageConverters.stream()
.noneMatch(mc -> mc.getSupportedMediaTypes().contains(MediaType.APPLICATION_JSON))) {
messageConverters.add(0, getJsonConverter());
}
restTemplate.setMessageConverters(messageConverters);
return restTemplate;
}
private HttpMessageConverter<?> getJsonConverter() {
final List<MediaType> supportedMediatypes = Arrays.asList(MediaType.APPLICATION_JSON);
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new Jackson2HalModule());
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
MappingJackson2HttpMessageConverter converter = new MappingJackson2HttpMessageConverter();
converter.setObjectMapper(mapper);
converter.setSupportedMediaTypes(supportedMediatypes);
return converter;
}
private ClientHttpRequestFactory getRequestFactory() {
final HttpComponentsClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory();
final int timeout = 10000;
requestFactory.setConnectionRequestTimeout(timeout);
requestFactory.setConnectTimeout(timeout);
requestFactory.setReadTimeout(timeout);
return requestFactory;
}
public void tryOrThrow(Runnable runnable, String actionDescription) {
try {
runnable.run();
} catch (RestClientResponseException ex) {
throw new KubernetesClientException(
String.format("An error occurred while %s: %d - %s",
actionDescription,
ex.getRawStatusCode(),
ex.getResponseBodyAsString()),
ex);
} catch (Exception ex) {
throw new KubernetesClientException("A generic error occurred while " + actionDescription, ex);
}
}
public <T> T tryOrThrow(Supplier<T> supplier) {
return tryOrThrow(supplier, "talking with k8s-service");
}
public <T> T tryOrThrow(Supplier<T> supplier, String action) {
try {
return supplier.get();
} catch (RestClientResponseException ex) {
throw new KubernetesClientException(
String.format("An error occurred while %s: %d - %s",
action,
ex.getRawStatusCode(),
ex.getResponseBodyAsString()),
ex);
} catch (Exception ex) {
throw new KubernetesClientException("A generic error occurred while " + action, ex);
}
}
}