[bitnami/kafka] bugfix: use kafka-broker-api-versions.sh to wait for Kafka on provisioning (#32613)

This commit is contained in:
Juan Ariza Toledano
2025-03-26 13:28:21 +01:00
committed by GitHub
parent 2120902bdf
commit 328166c403
5 changed files with 148 additions and 111 deletions

View File

@@ -47,8 +47,116 @@ spec:
{{- if .Values.provisioning.tolerations }}
tolerations: {{- include "common.tplvalues.render" (dict "value" .Values.provisioning.tolerations "context" .) | nindent 8 }}
{{- end }}
{{- if or .Values.provisioning.initContainers .Values.provisioning.waitForKafka }}
initContainers:
- name: prepare-config
image: {{ include "kafka.image" . }}
imagePullPolicy: {{ .Values.image.pullPolicy | quote }}
{{- if .Values.provisioning.containerSecurityContext.enabled }}
securityContext: {{- include "common.compatibility.renderSecurityContext" (dict "secContext" .Values.provisioning.containerSecurityContext "context" $) | nindent 12 }}
{{- end }}
command:
- /bin/bash
args:
- -ec
- |
. /opt/bitnami/scripts/libkafka.sh
if [[ ! -f "$KAFKA_CONF_FILE" ]]; then
touch $KAFKA_CONF_FILE
kafka_server_conf_set security.protocol {{ .Values.listeners.client.protocol | quote }}
{{- if regexFind "SSL" (upper .Values.listeners.client.protocol) }}
kafka_server_conf_set ssl.keystore.type {{ upper .Values.provisioning.auth.tls.type | quote }}
kafka_server_conf_set ssl.truststore.type {{ upper .Values.provisioning.auth.tls.type | quote }}
! is_empty_value "$KAFKA_CLIENT_KEY_PASSWORD" && kafka_server_conf_set ssl.key.password "$KAFKA_CLIENT_KEY_PASSWORD"
{{- if eq (upper .Values.provisioning.auth.tls.type) "PEM" }}
{{- if .Values.provisioning.auth.tls.caCert }}
file_to_multiline_property() {
awk 'NR > 1{print line" \\"}{line=$0;}END{print $0" "}' <"${1:?missing file}"
}
kafka_server_conf_set ssl.keystore.key "$(file_to_multiline_property "/certs/{{ .Values.provisioning.auth.tls.key }}")"
kafka_server_conf_set ssl.keystore.certificate.chain "$(file_to_multiline_property "/certs/{{ .Values.provisioning.auth.tls.cert }}")"
kafka_server_conf_set ssl.truststore.certificates "$(file_to_multiline_property "/certs/{{ .Values.provisioning.auth.tls.caCert }}")"
{{- else }}
kafka_server_conf_set ssl.keystore.location "/certs/{{ .Values.provisioning.auth.tls.keystore }}"
kafka_server_conf_set ssl.truststore.location "/certs/{{ .Values.provisioning.auth.tls.truststore }}"
{{- end }}
{{- else if eq (upper .Values.provisioning.auth.tls.type) "JKS" }}
kafka_server_conf_set ssl.keystore.location "/certs/{{ .Values.provisioning.auth.tls.keystore }}"
kafka_server_conf_set ssl.truststore.location "/certs/{{ .Values.provisioning.auth.tls.truststore }}"
! is_empty_value "$KAFKA_CLIENT_KEYSTORE_PASSWORD" && kafka_server_conf_set ssl.keystore.password "$KAFKA_CLIENT_KEYSTORE_PASSWORD"
! is_empty_value "$KAFKA_CLIENT_TRUSTSTORE_PASSWORD" && kafka_server_conf_set ssl.truststore.password "$KAFKA_CLIENT_TRUSTSTORE_PASSWORD"
{{- end }}
{{- end }}
{{- if regexFind "SASL" (upper .Values.listeners.client.protocol) }}
{{- if regexFind "PLAIN" ( upper .Values.sasl.enabledMechanisms) }}
kafka_server_conf_set sasl.mechanism PLAIN
kafka_server_conf_set sasl.jaas.config "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$SASL_USERNAME\" password=\"$SASL_USER_PASSWORD\";"
{{- else if regexFind "SCRAM-SHA-256" ( upper .Values.sasl.enabledMechanisms) }}
kafka_server_conf_set sasl.mechanism SCRAM-SHA-256
kafka_server_conf_set sasl.jaas.config "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"$SASL_USERNAME\" password=\"$SASL_USER_PASSWORD\";"
{{- else if regexFind "SCRAM-SHA-512" ( upper .Values.sasl.enabledMechanisms) }}
kafka_server_conf_set sasl.mechanism SCRAM-SHA-512
kafka_server_conf_set sasl.jaas.config "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"$SASL_USERNAME\" password=\"$SASL_USER_PASSWORD\";"
{{- else if regexFind "OAUTHBEARER" ( upper .Values.sasl.enabledMechanisms) }}
kafka_server_conf_set sasl.mechanism OAUTHBEARER
kafka_server_conf_set sasl.jaas.config "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId=\"$SASL_CLIENT_ID\" password=\"$SASL_CLIENT_SECRET\";"
kafka_server_conf_set sasl.login.callback.handler.class "org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
kafka_server_conf_set sasl.oauthbearer.token.endpoint.url {{ .Values.sasl.oauthbearer.tokenEndpointUrl | quote }}
{{- end }}
{{- end }}
fi
env:
- name: KAFKA_CONF_FILE
value: /shared/client.properties
{{- if and (regexFind "SSL" (upper .Values.listeners.client.protocol)) .Values.provisioning.auth.tls.passwordsSecret }}
- name: KAFKA_CLIENT_KEY_PASSWORD
valueFrom:
secretKeyRef:
name: {{ template "kafka.client.passwordsSecretName" . }}
key: {{ .Values.provisioning.auth.tls.keyPasswordSecretKey }}
- name: KAFKA_CLIENT_KEYSTORE_PASSWORD
valueFrom:
secretKeyRef:
name: {{ template "kafka.client.passwordsSecretName" . }}
key: {{ .Values.provisioning.auth.tls.keystorePasswordSecretKey }}
- name: KAFKA_CLIENT_TRUSTSTORE_PASSWORD
valueFrom:
secretKeyRef:
name: {{ template "kafka.client.passwordsSecretName" . }}
key: {{ .Values.provisioning.auth.tls.truststorePasswordSecretKey }}
{{- end }}
{{- if regexFind "SASL" (upper .Values.listeners.client.protocol) }}
{{- if include "kafka.saslUserPasswordsEnabled" . }}
- name: SASL_USERNAME
value: {{ index .Values.sasl.client.users 0 | quote }}
- name: SASL_USER_PASSWORD
valueFrom:
secretKeyRef:
name: {{ include "kafka.saslSecretName" . }}
key: system-user-password
{{- end }}
{{- if include "kafka.saslClientSecretsEnabled" . }}
- name: SASL_CLIENT_ID
value: {{ .Values.sasl.interbroker.clientId | quote }}
- name: SASL_USER_PASSWORD
valueFrom:
secretKeyRef:
name: {{ include "kafka.saslSecretName" . }}
key: inter-broker-client-secret
{{- end }}
{{- end }}
{{- if .Values.provisioning.resources }}
resources: {{- toYaml .Values.provisioning.resources | nindent 12 }}
{{- else if ne .Values.provisioning.resourcesPreset "none" }}
resources: {{- include "common.resources.preset" (dict "type" .Values.provisioning.resourcesPreset) | nindent 12 }}
{{- end }}
volumeMounts:
- name: shared
mountPath: /shared
{{- if .Values.provisioning.initContainers }}
{{- include "common.tplvalues.render" ( dict "value" .Values.provisioning.initContainers "context" $ ) | nindent 8 }}
{{- end }}
{{- if .Values.provisioning.waitForKafka }}
- name: wait-for-available-kafka
image: {{ include "kafka.image" . }}
@@ -61,22 +169,32 @@ spec:
args:
- -ec
- |
wait-for-port \
--host={{ include "common.names.fullname" . }} \
--state=inuse \
--timeout=120 \
{{ .Values.service.ports.client | int64 }};
echo "Kafka is available";
. /opt/bitnami/scripts/libos.sh
exit_code=0
if ! retry_while "/opt/bitnami/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server ${KAFKA_SERVICE} --command-config /shared/client.properties"; then
echo "Kafka is not ready"
exit_code=1
else
echo "Kafka ready"
fi
exit "$exit_code"
env:
- name: KAFKA_SERVICE
value: {{ printf "%s:%d" (include "common.names.fullname" .) (.Values.service.ports.client | int64) }}
{{- if .Values.provisioning.resources }}
resources: {{- toYaml .Values.provisioning.resources | nindent 12 }}
{{- else if ne .Values.provisioning.resourcesPreset "none" }}
resources: {{- include "common.resources.preset" (dict "type" .Values.provisioning.resourcesPreset) | nindent 12 }}
{{- end }}
volumeMounts:
- name: shared
mountPath: /shared
{{- end }}
{{- if .Values.provisioning.initContainers }}
{{- include "common.tplvalues.render" ( dict "value" .Values.provisioning.initContainers "context" $ ) | nindent 8 }}
{{- end }}
{{- end }}
containers:
- name: kafka-provisioning
image: {{ include "kafka.image" . }}
@@ -98,57 +216,8 @@ spec:
args: {{- include "common.tplvalues.render" (dict "value" .Values.provisioning.args "context" $) | nindent 12 }}
{{- else }}
args:
- -efc
- -ec
- |
. /opt/bitnami/scripts/libkafka.sh
export CLIENT_CONF="${CLIENT_CONF:-/tmp/client.properties}"
if [[ ! -f "$CLIENT_CONF" ]]; then
touch $CLIENT_CONF
kafka_common_conf_set "$CLIENT_CONF" security.protocol {{ .Values.listeners.client.protocol | quote }}
{{- if regexFind "SSL" (upper .Values.listeners.client.protocol) }}
kafka_common_conf_set "$CLIENT_CONF" ssl.keystore.type {{ upper .Values.provisioning.auth.tls.type | quote }}
kafka_common_conf_set "$CLIENT_CONF" ssl.truststore.type {{ upper .Values.provisioning.auth.tls.type | quote }}
! is_empty_value "$KAFKA_CLIENT_KEY_PASSWORD" && kafka_common_conf_set "$CLIENT_CONF" ssl.key.password "$KAFKA_CLIENT_KEY_PASSWORD"
{{- if eq (upper .Values.provisioning.auth.tls.type) "PEM" }}
{{- if .Values.provisioning.auth.tls.caCert }}
file_to_multiline_property() {
awk 'NR > 1{print line" \\"}{line=$0;}END{print $0" "}' <"${1:?missing file}"
}
kafka_common_conf_set "$CLIENT_CONF" ssl.keystore.key "$(file_to_multiline_property "/certs/{{ .Values.provisioning.auth.tls.key }}")"
kafka_common_conf_set "$CLIENT_CONF" ssl.keystore.certificate.chain "$(file_to_multiline_property "/certs/{{ .Values.provisioning.auth.tls.cert }}")"
kafka_common_conf_set "$CLIENT_CONF" ssl.truststore.certificates "$(file_to_multiline_property "/certs/{{ .Values.provisioning.auth.tls.caCert }}")"
{{- else }}
kafka_common_conf_set "$CLIENT_CONF" ssl.keystore.location "/certs/{{ .Values.provisioning.auth.tls.keystore }}"
kafka_common_conf_set "$CLIENT_CONF" ssl.truststore.location "/certs/{{ .Values.provisioning.auth.tls.truststore }}"
{{- end }}
{{- else if eq (upper .Values.provisioning.auth.tls.type) "JKS" }}
kafka_common_conf_set "$CLIENT_CONF" ssl.keystore.location "/certs/{{ .Values.provisioning.auth.tls.keystore }}"
kafka_common_conf_set "$CLIENT_CONF" ssl.truststore.location "/certs/{{ .Values.provisioning.auth.tls.truststore }}"
! is_empty_value "$KAFKA_CLIENT_KEYSTORE_PASSWORD" && kafka_common_conf_set "$CLIENT_CONF" ssl.keystore.password "$KAFKA_CLIENT_KEYSTORE_PASSWORD"
! is_empty_value "$KAFKA_CLIENT_TRUSTSTORE_PASSWORD" && kafka_common_conf_set "$CLIENT_CONF" ssl.truststore.password "$KAFKA_CLIENT_TRUSTSTORE_PASSWORD"
{{- end }}
{{- end }}
{{- if regexFind "SASL" (upper .Values.listeners.client.protocol) }}
{{- if regexFind "PLAIN" ( upper .Values.sasl.enabledMechanisms) }}
kafka_common_conf_set "$CLIENT_CONF" sasl.mechanism PLAIN
kafka_common_conf_set "$CLIENT_CONF" sasl.jaas.config "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$SASL_USERNAME\" password=\"$SASL_USER_PASSWORD\";"
{{- else if regexFind "SCRAM-SHA-256" ( upper .Values.sasl.enabledMechanisms) }}
kafka_common_conf_set "$CLIENT_CONF" sasl.mechanism SCRAM-SHA-256
kafka_common_conf_set "$CLIENT_CONF" sasl.jaas.config "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"$SASL_USERNAME\" password=\"$SASL_USER_PASSWORD\";"
{{- else if regexFind "SCRAM-SHA-512" ( upper .Values.sasl.enabledMechanisms) }}
kafka_common_conf_set "$CLIENT_CONF" sasl.mechanism SCRAM-SHA-512
kafka_common_conf_set "$CLIENT_CONF" sasl.jaas.config "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"$SASL_USERNAME\" password=\"$SASL_USER_PASSWORD\";"
{{- else if regexFind "OAUTHBEARER" ( upper .Values.sasl.enabledMechanisms) }}
kafka_common_conf_set "$CLIENT_CONF" sasl.mechanism OAUTHBEARER
kafka_common_conf_set "$CLIENT_CONF" sasl.jaas.config "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId=\"$SASL_CLIENT_ID\" password=\"$SASL_CLIENT_SECRET\";"
kafka_common_conf_set "$CLIENT_CONF" sasl.login.callback.handler.class "org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
kafka_common_conf_set "$CLIENT_CONF" sasl.oauthbearer.token.endpoint.url {{ .Values.sasl.oauthbearer.tokenEndpointUrl | quote }}
{{- end }}
{{- end }}
fi
{{- if .Values.provisioning.preScript }}
echo "Running pre-provisioning script"
{{ .Values.provisioning.preScript | nindent 14 }}
@@ -165,7 +234,7 @@ spec:
{{- range $name, $value := $topic.config }}
--config {{ $name }}={{ $value }} \
{{- end }}
--command-config ${CLIENT_CONF} \
--command-config /shared/client.properties \
--topic {{ $topic.name }}"
{{- end }}
{{- range $command := .Values.provisioning.extraProvisioningCommands }}
@@ -192,45 +261,8 @@ spec:
env:
- name: BITNAMI_DEBUG
value: {{ ternary "true" "false" (or .Values.image.debug .Values.diagnosticMode.enabled) | quote }}
{{- if and (regexFind "SSL" (upper .Values.listeners.client.protocol)) .Values.provisioning.auth.tls.passwordsSecret }}
- name: KAFKA_CLIENT_KEY_PASSWORD
valueFrom:
secretKeyRef:
name: {{ template "kafka.client.passwordsSecretName" . }}
key: {{ .Values.provisioning.auth.tls.keyPasswordSecretKey }}
- name: KAFKA_CLIENT_KEYSTORE_PASSWORD
valueFrom:
secretKeyRef:
name: {{ template "kafka.client.passwordsSecretName" . }}
key: {{ .Values.provisioning.auth.tls.keystorePasswordSecretKey }}
- name: KAFKA_CLIENT_TRUSTSTORE_PASSWORD
valueFrom:
secretKeyRef:
name: {{ template "kafka.client.passwordsSecretName" . }}
key: {{ .Values.provisioning.auth.tls.truststorePasswordSecretKey }}
{{- end }}
- name: KAFKA_SERVICE
value: {{ printf "%s:%d" (include "common.names.fullname" .) (.Values.service.ports.client | int64) }}
{{- if regexFind "SASL" (upper .Values.listeners.client.protocol) }}
{{- if include "kafka.saslUserPasswordsEnabled" . }}
- name: SASL_USERNAME
value: {{ index .Values.sasl.client.users 0 | quote }}
- name: SASL_USER_PASSWORD
valueFrom:
secretKeyRef:
name: {{ include "kafka.saslSecretName" . }}
key: system-user-password
{{- end }}
{{- if include "kafka.saslClientSecretsEnabled" . }}
- name: SASL_CLIENT_ID
value: {{ .Values.sasl.interbroker.clientId | quote }}
- name: SASL_USER_PASSWORD
valueFrom:
secretKeyRef:
name: {{ include "kafka.saslSecretName" . }}
key: inter-broker-client-secret
{{- end }}
{{- end }}
{{- if .Values.provisioning.extraEnvVars }}
{{- include "common.tplvalues.render" ( dict "value" .Values.provisioning.extraEnvVars "context" $) | nindent 12 }}
{{- end }}
@@ -263,8 +295,8 @@ spec:
readOnly: true
{{- end }}
{{- end }}
- name: tmp
mountPath: /tmp
- name: shared
mountPath: /shared
{{- if .Values.provisioning.extraVolumeMounts }}
{{- include "common.tplvalues.render" (dict "value" .Values.provisioning.extraVolumeMounts "context" $) | nindent 12 }}
{{- end }}
@@ -285,7 +317,7 @@ spec:
defaultMode: 256
{{- end }}
{{- end }}
- name: tmp
- name: shared
emptyDir: {}
{{- if .Values.provisioning.extraVolumes }}
{{- include "common.tplvalues.render" (dict "value" .Values.provisioning.extraVolumes "context" $) | nindent 8 }}