Unable to fetch vault token information from k8s vault pod

I have vault pod in k8s and running a python script to connect to vault using jwt token and grab the required vault token name and value. Then I need to check the expiry date of the token, so I am using request method to get the token’s information and check its ttl.
But for some reason I am seeing the service_account’s token details in the output instead of actual token information

Python script, here k8s_token will authenticate to vault using role, jwt_token which is working and also fetching the required tokens, and then it should get the token_param’s info instead of service_account’s token info, I also checked that token_param is the actual token that I am passing here to check its info

import argparse
import time
import os
import requests
import json
import math
import re
import traceback
from pathlib import Path
from settings.cryptutils import decryptValuesEncryptedWithAES

def getK8AuthUrl(baseUrl):
    return "{0}/v1/auth/kubernetes/login".format(baseUrl)

def getK8AuthToken():
    K8S_TOKEN_FILE = '/var/run/secrets/kubernetes.io/serviceaccount/token'
    TOKEN_FILE =  open(K8S_TOKEN_FILE,'r')
    jwt = TOKEN_FILE.read()
    return jwt

def getK8AuthHeader() -> dict:
    return {
        "content-Type": "application/json"

def handleSuccessResponse(response, url):
    data = response.json()
    auth = data.get("auth", None)
    if not data:
        print("ERROR", "No data from:" + url )
        return ""
    auth = data.get("auth", None)
    if not auth:
        print("ERROR", "No auth node:" + url)
        return ""
    return auth.get('client_token', "")

def handleErrorResponse(response, url):
    if response.status_code == 503:
        print("ERROR","Vault is sealed, URL: " + url)
    elif response.status_code == 403:
        print("ERROR","Invalid vault token, URL: " + url)
    elif response.status_code == 500:
        print("ERROR, Server unresponsive " + url)
    return ""

def authVaultUsingK8s(vault_url, appRole):
        url =  getK8AuthUrl(vault_url)
        jwtToken = getK8AuthToken()
        header = getK8AuthHeader()
        payload = {
            "role" : appRole,
            "jwt" : jwtToken
        response =  requests.post(url, data=json.dumps(payload), headers=header, verify=k8_Cert)
        if response.status_code == 200:
            return handleSuccessResponse(response, url)
        return handleErrorResponse(response, url)
    except Exception as e:
        print('Exception: ' + str(e))

def unwrapIfNeeded(key):
    if key and len(key) % 32 == 0:
        return decryptValuesEncryptedWithAES(key)
        return key

def send_req(url, token, action, payload={}):

    headers = { 'X-Vault-Token': k8s_token }
        response = requests.request(action, url, headers=headers, verify=k8_Cert)

        if response.status_code != 200:
            print('Connecting to ' + url + ' failed with status_code : ' + str(response.status_code))
            return json.loads(response.text)

    except Exception as e:
        print('Exception: ' + str(e))

def check_token_expiration(token_param):

    token_name = str(token_param)

    print("token_param is "+ token_name)

    check_token = send_req(url=vault_url+'/v1/auth/token/lookup-self', action='GET', token=token_param)

    token_expiration=math.floor(check_token['data']['ttl']/ 60 / 60 / 24)

    print("\ncheck token informatio: " + str(check_token) + "\n\n")

    print("Vault token will expire in "+ str(token_expiration) +" days")

    if token_expiration <= warn_days:
        renew_token = send_req(url=vault_url+'/v1/auth/token/renew-self', token=token_param, action='POST', payload={"token":token_param})
        print("Token has been renewed")

def find_other_tokens(k8s_token):
    headers = { 'X-Vault-Token': k8s_token }
        url = vault_url+'/v1/secret/common'
        response = requests.request('GET', url, headers=headers, verify=k8_Cert)

        if response.status_code != 200:
            print('Connecting to ' + url + ' failed with status_code : ' + str(response.status_code))
            #return json.loads(response.text)
            if 'data' in response_json:
                for token_name in response_json['data']:
                    if re.search('-vault-token', token_name):
                        other_tokens.append({"token_name": token_name, "token_value": response_json['data'][ token_name ]})

                print('JSON response does not contain the "data" key')

    except Exception as e:
        print('Exception: ' + str(e))

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument('--vault-url', help='Vault API url (make sure it does not end with /)', required=False, default='https://XXXXX:8200')
    parser.add_argument('--vault-token', help='Vault Application Access Access Token wrapped', required=True)
    parser.add_argument('--warn-days', help='Number of days for token expiration to warn', required=True)

    args = parser.parse_args()
    vault_url = args.vault_url
    warn_days = int(args.warn_days)

    k8_Cert = '/var/run/secrets/kubernetes.io/serviceaccount/ca.crt'
    if not os.path.exists( k8_Cert ):
        print('K8S cert file ' + k8_Cert + ' does not exist')


    if args.vault_token == "k8s":
        vaultRole = "app-role"
        k8s_token = authVaultUsingK8s(vault_url, vaultRole)
        k8s_token = unwrapIfNeeded(args.vault_token)

    header = {'Content-Type': 'application/json', 'X-Vault-Token': k8s_token}
    backup_dict = {}


    for token_entry in other_tokens:
        print("\n\n\n\nChecking " + token_entry['token_name'] + " for expiration")

I am seeing this random token’s(service account’s) output instead of actual token’s data

token info: {'request_id': '6017f043-b776-c4dc-7b88-3f3260f40fae', 'lease_id': '', 'renewable': False, 'lease_duration': 0, 'data': {'accessor': 'zt9mTKNpeIHnnVRAxxxxxx', 'creation_time': 1676069816, 'creation_ttl': 3600, 'display_name': 'kubernetes-util-vault-auth', 'entity_id': 'a97ad241-737d-4976-d6c5-20eb6fc5da90', 'expire_time': '2023-02-10T18:56:56.943311023-05:00', 'explicit_max_ttl': 0, 'id': 'xxxxxx', 'issue_time': '2023-02-10T17:56:56.943310771-05:00', 'meta': {'role': 'app-role', 'service_account_name': 'vault-auth', 'service_account_namespace': 'util', 'service_account_secret_name': 'vault-auth-token-n4mqb', 'service_account_uid': '00276763-14a1-40f9-b02e-5fd7ba77d524'}, 'num_uses': 0, 'orphan': True, 'path': 'auth/kubernetes/login', 'policies': ['app-role', 'default'], 'renewable': True, 'ttl': 3599, 'type': 'service'}, 'wrap_info': None, 'warnings': None, 'auth': None}

Vault is simply telling you about the token you are passing to it. However, it looks like your code quoted above in fact has a bug, and is passing a token other than the one you intended to pass.

I have edited the original post with the complete code.
The k8s_token is being used to authenticate to vault and look for the actual secrets that have -vault-token pattern and then I am assigning the secret’s value to token_param and trying to get the token_param’s information but for some reason I am seeing different token’s(service_account’s) information.

You are calling the lookup-self endpoint, which is defined to always return information about the token used to authenticate to Vault.

Perhaps you want the lookup endpoint instead?

I tried lookup endpoint as well and its still same.

Even if we use lookup-self endpoint it should return the k8s_token’s(root token) information right? instead it return the service account name’s information which is why I am unable to figure it out

Yes, and it is. By the way, root token means something totally different in the Vault documentation, so that’s a confusing way to describe it.

Uh, because that’s the service account you authenticated as.

Take another look at you send_req function. Notice that the token and payload parameters are entirely unused. Looks like a bug to me.

yes you are correct.
I have passed the required token again in send_req() as headers={'X-Vault-Token':token_param} and its working, thank you.