I have vault pod in k8s and running a python script to connect to vault using jwt token and grab the required vault token name and value. Then I need to check the expiry date of the token, so I am using request method to get the token’s information and check its ttl.
But for some reason I am seeing the service_account’s token details in the output instead of actual token information
Python script, here k8s_token will authenticate to vault using role, jwt_token which is working and also fetching the required tokens, and then it should get the token_param’s info instead of service_account’s token info, I also checked that token_param is the actual token that I am passing here to check its info
import argparse
import time
import os
import requests
import json
import math
import re
import traceback
from pathlib import Path
from settings.cryptutils import decryptValuesEncryptedWithAES
def getK8AuthUrl(baseUrl):
return "{0}/v1/auth/kubernetes/login".format(baseUrl)
def getK8AuthToken():
K8S_TOKEN_FILE = '/var/run/secrets/kubernetes.io/serviceaccount/token'
TOKEN_FILE = open(K8S_TOKEN_FILE,'r')
jwt = TOKEN_FILE.read()
return jwt
def getK8AuthHeader() -> dict:
return {
"content-Type": "application/json"
}
def handleSuccessResponse(response, url):
data = response.json()
auth = data.get("auth", None)
if not data:
print("ERROR", "No data from:" + url )
return ""
auth = data.get("auth", None)
if not auth:
print("ERROR", "No auth node:" + url)
return ""
return auth.get('client_token', "")
def handleErrorResponse(response, url):
if response.status_code == 503:
print("ERROR","Vault is sealed, URL: " + url)
elif response.status_code == 403:
print("ERROR","Invalid vault token, URL: " + url)
elif response.status_code == 500:
print("ERROR, Server unresponsive " + url)
exit(4)
return ""
def authVaultUsingK8s(vault_url, appRole):
try:
url = getK8AuthUrl(vault_url)
jwtToken = getK8AuthToken()
header = getK8AuthHeader()
payload = {
"role" : appRole,
"jwt" : jwtToken
}
response = requests.post(url, data=json.dumps(payload), headers=header, verify=k8_Cert)
if response.status_code == 200:
return handleSuccessResponse(response, url)
return handleErrorResponse(response, url)
except Exception as e:
print('Exception: ' + str(e))
traceback.print_exc()
exit(4)
def unwrapIfNeeded(key):
if key and len(key) % 32 == 0:
return decryptValuesEncryptedWithAES(key)
else:
return key
def send_req(url, token, action, payload={}):
headers = { 'X-Vault-Token': k8s_token }
try:
response = requests.request(action, url, headers=headers, verify=k8_Cert)
if response.status_code != 200:
print('Connecting to ' + url + ' failed with status_code : ' + str(response.status_code))
exit(4)
else:
return json.loads(response.text)
except Exception as e:
print('Exception: ' + str(e))
traceback.print_exc()
exit(4)
def check_token_expiration(token_param):
token_name = str(token_param)
print("token_param is "+ token_name)
check_token = send_req(url=vault_url+'/v1/auth/token/lookup-self', action='GET', token=token_param)
token_expiration=math.floor(check_token['data']['ttl']/ 60 / 60 / 24)
print("\ncheck token informatio: " + str(check_token) + "\n\n")
print("Vault token will expire in "+ str(token_expiration) +" days")
if token_expiration <= warn_days:
renew_token = send_req(url=vault_url+'/v1/auth/token/renew-self', token=token_param, action='POST', payload={"token":token_param})
print("Token has been renewed")
def find_other_tokens(k8s_token):
headers = { 'X-Vault-Token': k8s_token }
try:
url = vault_url+'/v1/secret/common'
response = requests.request('GET', url, headers=headers, verify=k8_Cert)
if response.status_code != 200:
print('Connecting to ' + url + ' failed with status_code : ' + str(response.status_code))
exit(4)
else:
#return json.loads(response.text)
response_json=json.loads(response.text)
if 'data' in response_json:
for token_name in response_json['data']:
if re.search('-vault-token', token_name):
other_tokens.append({"token_name": token_name, "token_value": response_json['data'][ token_name ]})
else:
print('JSON response does not contain the "data" key')
exit(4)
except Exception as e:
print('Exception: ' + str(e))
traceback.print_exc()
exit(4)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--vault-url', help='Vault API url (make sure it does not end with /)', required=False, default='https://XXXXX:8200')
parser.add_argument('--vault-token', help='Vault Application Access Access Token wrapped', required=True)
parser.add_argument('--warn-days', help='Number of days for token expiration to warn', required=True)
args = parser.parse_args()
vault_url = args.vault_url
warn_days = int(args.warn_days)
k8_Cert = '/var/run/secrets/kubernetes.io/serviceaccount/ca.crt'
if not os.path.exists( k8_Cert ):
print('K8S cert file ' + k8_Cert + ' does not exist')
exit(5)
os.chdir('/tmp')
if args.vault_token == "k8s":
vaultRole = "app-role"
k8s_token = authVaultUsingK8s(vault_url, vaultRole)
else:
k8s_token = unwrapIfNeeded(args.vault_token)
header = {'Content-Type': 'application/json', 'X-Vault-Token': k8s_token}
backup_dict = {}
other_tokens=[]
find_other_tokens(k8s_token)
for token_entry in other_tokens:
print("\n\n\n\nChecking " + token_entry['token_name'] + " for expiration")
other_token_entry=unwrapIfNeeded(token_entry['token_value'])
check_token_expiration(other_token_entry)
I am seeing this random token’s(service account’s) output instead of actual token’s data
token info: {'request_id': '6017f043-b776-c4dc-7b88-3f3260f40fae', 'lease_id': '', 'renewable': False, 'lease_duration': 0, 'data': {'accessor': 'zt9mTKNpeIHnnVRAxxxxxx', 'creation_time': 1676069816, 'creation_ttl': 3600, 'display_name': 'kubernetes-util-vault-auth', 'entity_id': 'a97ad241-737d-4976-d6c5-20eb6fc5da90', 'expire_time': '2023-02-10T18:56:56.943311023-05:00', 'explicit_max_ttl': 0, 'id': 'xxxxxx', 'issue_time': '2023-02-10T17:56:56.943310771-05:00', 'meta': {'role': 'app-role', 'service_account_name': 'vault-auth', 'service_account_namespace': 'util', 'service_account_secret_name': 'vault-auth-token-n4mqb', 'service_account_uid': '00276763-14a1-40f9-b02e-5fd7ba77d524'}, 'num_uses': 0, 'orphan': True, 'path': 'auth/kubernetes/login', 'policies': ['app-role', 'default'], 'renewable': True, 'ttl': 3599, 'type': 'service'}, 'wrap_info': None, 'warnings': None, 'auth': None}