"""
Implementation of "fedcloud token" commands for interactions with EGI Check-in and
access tokens
"""
import re
import sys
import time
from datetime import datetime
import click
import jwt
import liboidcagent as agent
import requests
from fedcloudclient.decorators import (
oidc_access_token_params,
oidc_params,
oidc_refresh_token_params,
)
[docs]def oidc_discover(oidc_url):
"""
Discover oidc endpoints
:param oidc_url: CheckIn URL
:return: JSON object of OIDC configuration
"""
r = requests.get(oidc_url + "/.well-known/openid-configuration")
r.raise_for_status()
return r.json()
[docs]def token_refresh(oidc_client_id, oidc_client_secret, oidc_refresh_token, oidc_url):
"""
Helper function for retrieving JSON object with access token
:param oidc_client_id:
:param oidc_client_secret:
:param oidc_refresh_token:
:param oidc_url:
:return: JSON object with access token
"""
oidc_ep = oidc_discover(oidc_url)
refresh_data = {
"client_id": oidc_client_id,
"client_secret": oidc_client_secret,
"grant_type": "refresh_token",
"refresh_token": oidc_refresh_token,
"scope": "openid email profile offline_access",
}
r = requests.post(
oidc_ep["token_endpoint"],
auth=(oidc_client_id, oidc_client_secret),
data=refresh_data,
)
r.raise_for_status()
return r.json()
[docs]def refresh_access_token(
oidc_client_id, oidc_client_secret, oidc_refresh_token, oidc_url
):
"""
Retrieve access token in plain text (string)
:param oidc_client_id:
:param oidc_client_secret:
:param oidc_refresh_token:
:param oidc_url:
:return: access token
"""
return token_refresh(
oidc_client_id,
oidc_client_secret,
oidc_refresh_token,
oidc_url,
)["access_token"]
[docs]def get_access_token(
oidc_access_token,
oidc_refresh_token,
oidc_client_id,
oidc_client_secret,
oidc_url,
oidc_agent_account,
):
"""
Getting access token.
Generate new access token from oidc-agent or refresh token (if given)
or use existing token
Check expiration time of access token
Raise error if no valid token exists
:param oidc_access_token:
:param oidc_refresh_token:
:param oidc_client_id:
:param oidc_client_secret:
:param oidc_url:
:param oidc_agent_account:
:return: access token
"""
# First, try to get access token from oidc-agent
if oidc_agent_account:
try:
access_token = agent.get_access_token(
oidc_agent_account,
min_valid_period=30,
application_hint="fedcloudclient",
)
return access_token
except agent.OidcAgentError as e:
print("ERROR oidc-agent: {}".format(e))
# Then try refresh token
if oidc_refresh_token and oidc_client_id and oidc_client_secret and oidc_url:
print(
"WARNING: exposing refresh tokens is insecure and will be disabled in "
"next version!",
file=sys.stderr,
)
return token_refresh(
oidc_client_id, oidc_client_secret, oidc_refresh_token, oidc_url
)["access_token"]
# Then finally access token
elif oidc_access_token:
# Check expiration time of access token
try:
payload = jwt.decode(oidc_access_token, options={"verify_signature": False})
except jwt.exceptions.InvalidTokenError:
raise SystemExit("Error: Invalid access token.")
expiration_timestamp = int(payload["exp"])
current_timestamp = int(time.time())
if current_timestamp > expiration_timestamp - 10:
raise SystemExit(
"The given access token has expired."
+ " Get new access token before continuing on operation"
)
return oidc_access_token
else:
raise SystemExit(
"Error: An access token is needed for the operation. You can specify "
"access token directly via --oidc-access-token option or use oidc-agent "
"via --oidc-agent-account"
)
[docs]def token_list_vos(oidc_access_token, oidc_url):
"""
List VO memberships in EGI Check-in
:param oidc_access_token:
:param oidc_url:
:return: list of VO names
"""
oidc_ep = oidc_discover(oidc_url)
r = requests.get(
oidc_ep["userinfo_endpoint"],
headers={"Authorization": "Bearer %s" % oidc_access_token},
)
r.raise_for_status()
vos = []
m = re.compile("urn:mace:egi.eu:group:(.+?):(.+:)*role=member#aai.egi.eu")
for claim in r.json().get("eduperson_entitlement", []):
vo = m.match(claim)
if vo:
vos.append(vo.groups()[0])
return vos
@click.group()
def token():
"""
Token command group for manipulation with tokens
"""
pass
@token.command()
@oidc_refresh_token_params
@oidc_access_token_params
def check(oidc_refresh_token, oidc_access_token):
"""
CLI command for printing validity of access or refresh token
"""
if oidc_refresh_token:
try:
payload = jwt.decode(
oidc_refresh_token, options={"verify_signature": False}
)
except jwt.exceptions.InvalidTokenError:
raise SystemExit("Error: Invalid refresh token.")
expiration_timestamp = int(payload["exp"])
expiration_time = datetime.utcfromtimestamp(expiration_timestamp).strftime(
"%Y-%m-%d %H:%M:%S"
)
print("Refresh token is valid to %s UTC" % expiration_time)
current_timestamp = int(time.time())
if current_timestamp < expiration_timestamp:
print(
"Refresh token expires in %d days"
% ((expiration_timestamp - current_timestamp) // (24 * 3600))
)
else:
print("Refresh token has expired")
elif oidc_access_token:
try:
payload = jwt.decode(oidc_access_token, options={"verify_signature": False})
except jwt.exceptions.InvalidTokenError:
raise SystemExit("Error: Invalid access token.")
expiration_timestamp = int(payload["exp"])
expiration_time = datetime.utcfromtimestamp(expiration_timestamp).strftime(
"%Y-%m-%d %H:%M:%S"
)
print("Access token is valid to %s UTC" % expiration_time)
current_timestamp = int(time.time())
if current_timestamp < expiration_timestamp:
print(
"Access token expires in %d seconds"
% (expiration_timestamp - current_timestamp)
)
else:
print("Access token has expired")
else:
print("OIDC access token or refresh token required")
exit(1)
@token.command()
@oidc_params
def list_vos(
oidc_client_id,
oidc_client_secret,
oidc_refresh_token,
oidc_access_token,
oidc_url,
oidc_agent_account,
):
"""
CLI command for listing VO memberships according to access token
"""
oidc_access_token = get_access_token(
oidc_access_token,
oidc_refresh_token,
oidc_client_id,
oidc_client_secret,
oidc_url,
oidc_agent_account,
)
vos = token_list_vos(oidc_access_token, oidc_url)
print("\n".join(vos))