aboutsummaryrefslogtreecommitdiff
path: root/bin/executable_mutt_oauth2.py
diff options
context:
space:
mode:
Diffstat (limited to 'bin/executable_mutt_oauth2.py')
-rw-r--r--bin/executable_mutt_oauth2.py443
1 files changed, 0 insertions, 443 deletions
diff --git a/bin/executable_mutt_oauth2.py b/bin/executable_mutt_oauth2.py
deleted file mode 100644
index 559811f..0000000
--- a/bin/executable_mutt_oauth2.py
+++ /dev/null
@@ -1,443 +0,0 @@
-#!/usr/bin/env python3
-#
-# Mutt OAuth2 token management script, version 2020-08-07
-# Written against python 3.7.3, not tried with earlier python versions.
-#
-# Copyright (C) 2020 Alexander Perlis
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License as
-# published by the Free Software Foundation; either version 2 of the
-# License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
-# 02110-1301, USA.
-'''Mutt OAuth2 token management'''
-
-import sys
-import json
-import argparse
-import urllib.parse
-import urllib.request
-import imaplib
-import poplib
-import smtplib
-import base64
-import secrets
-import hashlib
-import time
-from datetime import timedelta, datetime
-from pathlib import Path
-import shlex
-import socket
-import http.server
-import subprocess
-import readline
-
-# The token file must be encrypted because it contains multi-use bearer tokens
-# whose usage does not require additional verification. Specify whichever
-# encryption and decryption pipes you prefer. They should read from standard
-# input and write to standard output. The example values here invoke GPG.
-ENCRYPTION_PIPE = ['gpg', '--encrypt', '--default-recipient-self']
-DECRYPTION_PIPE = ['gpg', '--decrypt']
-
-registrations = {
- 'google': {
- 'authorize_endpoint': 'https://accounts.google.com/o/oauth2/auth',
- 'devicecode_endpoint': 'https://oauth2.googleapis.com/device/code',
- 'token_endpoint': 'https://accounts.google.com/o/oauth2/token',
- 'redirect_uri': 'urn:ietf:wg:oauth:2.0:oob',
- 'imap_endpoint': 'imap.gmail.com',
- 'pop_endpoint': 'pop.gmail.com',
- 'smtp_endpoint': 'smtp.gmail.com',
- 'sasl_method': 'OAUTHBEARER',
- 'scope': 'https://mail.google.com/',
- },
- 'microsoft': {
- 'authorize_endpoint': 'https://login.microsoftonline.com/common/oauth2/v2.0/authorize',
- 'devicecode_endpoint': 'https://login.microsoftonline.com/common/oauth2/v2.0/devicecode',
- 'token_endpoint': 'https://login.microsoftonline.com/common/oauth2/v2.0/token',
- 'redirect_uri': 'https://login.microsoftonline.com/common/oauth2/nativeclient',
- 'tenant': 'common',
- 'imap_endpoint': 'outlook.office365.com',
- 'pop_endpoint': 'outlook.office365.com',
- 'smtp_endpoint': 'smtp.office365.com',
- 'sasl_method': 'XOAUTH2',
- 'scope': ('offline_access https://outlook.office.com/IMAP.AccessAsUser.All '
- 'https://outlook.office.com/POP.AccessAsUser.All '
- 'https://outlook.office.com/SMTP.Send'),
- },
-}
-
-ap = argparse.ArgumentParser(epilog='''
-This script obtains and prints a valid OAuth2 access token. State is maintained in an
-encrypted TOKENFILE. Run with "--verbose --authorize --encryption-pipe 'foo@bar.org'"
-to get started or whenever all tokens have expired, optionally with "--authflow" to override
-the default authorization flow. To truly start over from scratch, first delete TOKENFILE.
-Use "--verbose --test" to test the IMAP/POP/SMTP endpoints.
-''')
-ap.add_argument('-v', '--verbose', action='store_true', help='increase verbosity')
-ap.add_argument('-d', '--debug', action='store_true', help='enable debug output')
-ap.add_argument('tokenfile', help='persistent token storage')
-ap.add_argument('-a', '--authorize', action='store_true', help='manually authorize new tokens')
-ap.add_argument('--authflow', help='authcode | localhostauthcode | devicecode')
-ap.add_argument('-t', '--test', action='store_true', help='test IMAP/POP/SMTP endpoints')
-ap.add_argument('--decryption-pipe', type=shlex.split, default=DECRYPTION_PIPE,
- help='decryption command (string), reads from stdin and writes '
- 'to stdout, default: "{}"'.format(
- " ".join(DECRYPTION_PIPE)))
-ap.add_argument('--encryption-pipe', type=shlex.split, default=ENCRYPTION_PIPE,
- help='encryption command (string), reads from stdin and writes '
- 'to stdout, suggested: "{}"'.format(
- " ".join(ENCRYPTION_PIPE)))
-ap.add_argument('--client-id', type=str, default='',
- help='Provider id from registration')
-ap.add_argument('--client-secret', type=str, default='',
- help='(optional) Provider secret from registration')
-ap.add_argument('--provider', type=str, choices=registrations.keys(),
- help='Specify provider to use.')
-ap.add_argument('--email', type=str, help='Your email address.')
-args = ap.parse_args()
-
-ENCRYPTION_PIPE = args.encryption_pipe
-DECRYPTION_PIPE = args.decryption_pipe
-
-token = {}
-path = Path(args.tokenfile)
-if path.exists():
- if 0o777 & path.stat().st_mode != 0o600:
- sys.exit('Token file has unsafe mode. Suggest deleting and starting over.')
- try:
- sub = subprocess.run(DECRYPTION_PIPE, check=True, input=path.read_bytes(),
- capture_output=True)
- token = json.loads(sub.stdout)
- except subprocess.CalledProcessError:
- sys.exit('Difficulty decrypting token file. Is your decryption agent primed for '
- 'non-interactive usage, or an appropriate environment variable such as '
- 'GPG_TTY set to allow interactive agent usage from inside a pipe?')
-
-
-def writetokenfile():
- '''Writes global token dictionary into token file.'''
- if not path.exists():
- path.touch(mode=0o600)
- if 0o777 & path.stat().st_mode != 0o600:
- sys.exit('Token file has unsafe mode. Suggest deleting and starting over.')
- sub2 = subprocess.run(ENCRYPTION_PIPE, check=True, input=json.dumps(token).encode(),
- capture_output=True)
- path.write_bytes(sub2.stdout)
-
-
-if args.debug:
- print('Obtained from token file:', json.dumps(token))
-if not token:
- if not args.authorize:
- sys.exit('You must run script with "--authorize" at least once.')
- if not ENCRYPTION_PIPE:
- sys.exit("You need to provide a suitable --encryption-pipe setting")
- print('', )
- token['registration'] = args.provider or input(
- 'Available app and endpoint registrations: {regs}\nOAuth2 registration: '.format(
- regs=', '.join(registrations.keys())))
- token['authflow'] = args.authflow or input(
- 'Preferred OAuth2 flow ("authcode" or "localhostauthcode" or "devicecode"): '
- )
- token['email'] = args.email or input('Account e-mail address: ')
- token['access_token'] = ''
- token['access_token_expiration'] = ''
- token['refresh_token'] = ''
- token['client_id'] = args.client_id or input('Client ID: ')
- token['client_secret'] = args.client_secret or input('Client secret: ')
- writetokenfile()
-
-if token['registration'] not in registrations:
- sys.exit(f'ERROR: Unknown registration "{token["registration"]}". Delete token file '
- f'and start over.')
-registration = registrations[token['registration']]
-
-authflow = token['authflow']
-if args.authflow:
- authflow = args.authflow
-
-baseparams = {'client_id': token['client_id']}
-# Microsoft uses 'tenant' but Google does not
-if 'tenant' in registration:
- baseparams['tenant'] = registration['tenant']
-
-
-def access_token_valid():
- '''Returns True when stored access token exists and is still valid at this time.'''
- token_exp = token['access_token_expiration']
- return token_exp and datetime.now() < datetime.fromisoformat(token_exp)
-
-
-def update_tokens(r):
- '''Takes a response dictionary, extracts tokens out of it, and updates token file.'''
- token['access_token'] = r['access_token']
- token['access_token_expiration'] = (datetime.now() +
- timedelta(seconds=int(r['expires_in']))).isoformat()
- if 'refresh_token' in r:
- token['refresh_token'] = r['refresh_token']
- writetokenfile()
- if args.verbose:
- print(f'NOTICE: Obtained new access token, expires {token["access_token_expiration"]}.')
-
-
-if args.authorize:
- p = baseparams.copy()
- p['scope'] = registration['scope']
-
- if authflow in ('authcode', 'localhostauthcode'):
- verifier = secrets.token_urlsafe(90)
- challenge = base64.urlsafe_b64encode(hashlib.sha256(verifier.encode()).digest())[:-1]
- redirect_uri = registration['redirect_uri']
- listen_port = 0
- if authflow == 'localhostauthcode':
- # Find an available port to listen on
- s = socket.socket()
- s.bind(('127.0.0.1', 0))
- listen_port = s.getsockname()[1]
- s.close()
- redirect_uri = 'http://localhost:'+str(listen_port)+'/'
- # Probably should edit the port number into the actual redirect URL.
-
- p.update({'login_hint': token['email'],
- 'response_type': 'code',
- 'redirect_uri': redirect_uri,
- 'code_challenge': challenge,
- 'code_challenge_method': 'S256'})
- print(registration["authorize_endpoint"] + '?' +
- urllib.parse.urlencode(p, quote_via=urllib.parse.quote))
-
- authcode = ''
- if authflow == 'authcode':
- authcode = input('Visit displayed URL to retrieve authorization code. Enter '
- 'code from server (might be in browser address bar): ')
- else:
- print('Visit displayed URL to authorize this application. Waiting...',
- end='', flush=True)
-
- class MyHandler(http.server.BaseHTTPRequestHandler):
- '''Handles the browser query resulting from redirect to redirect_uri.'''
-
- # pylint: disable=C0103
- def do_HEAD(self):
- '''Response to a HEAD requests.'''
- self.send_response(200)
- self.send_header('Content-type', 'text/html')
- self.end_headers()
-
- def do_GET(self):
- '''For GET request, extract code parameter from URL.'''
- # pylint: disable=W0603
- global authcode
- querystring = urllib.parse.urlparse(self.path).query
- querydict = urllib.parse.parse_qs(querystring)
- if 'code' in querydict:
- authcode = querydict['code'][0]
- self.do_HEAD()
- self.wfile.write(b'<html><head><title>Authorization result</title></head>')
- self.wfile.write(b'<body><p>Authorization redirect completed. You may '
- b'close this window.</p></body></html>')
- with http.server.HTTPServer(('127.0.0.1', listen_port), MyHandler) as httpd:
- try:
- httpd.handle_request()
- except KeyboardInterrupt:
- pass
-
- if not authcode:
- sys.exit('Did not obtain an authcode.')
-
- for k in 'response_type', 'login_hint', 'code_challenge', 'code_challenge_method':
- del p[k]
- p.update({'grant_type': 'authorization_code',
- 'code': authcode,
- 'client_secret': token['client_secret'],
- 'code_verifier': verifier})
- print('Exchanging the authorization code for an access token')
- try:
- response = urllib.request.urlopen(registration['token_endpoint'],
- urllib.parse.urlencode(p).encode())
- except urllib.error.HTTPError as err:
- print(err.code, err.reason)
- response = err
- response = response.read()
- if args.debug:
- print(response)
- response = json.loads(response)
- if 'error' in response:
- print(response['error'])
- if 'error_description' in response:
- print(response['error_description'])
- sys.exit(1)
-
- elif authflow == 'devicecode':
- try:
- response = urllib.request.urlopen(registration['devicecode_endpoint'],
- urllib.parse.urlencode(p).encode())
- except urllib.error.HTTPError as err:
- print(err.code, err.reason)
- response = err
- response = response.read()
- if args.debug:
- print(response)
- response = json.loads(response)
- if 'error' in response:
- print(response['error'])
- if 'error_description' in response:
- print(response['error_description'])
- sys.exit(1)
- print(response['message'])
- del p['scope']
- p.update({'grant_type': 'urn:ietf:params:oauth:grant-type:device_code',
- 'client_secret': registration['client_secret'],
- 'device_code': response['device_code']})
- interval = int(response['interval'])
- print('Polling...', end='', flush=True)
- while True:
- time.sleep(interval)
- print('.', end='', flush=True)
- try:
- response = urllib.request.urlopen(registration['token_endpoint'],
- urllib.parse.urlencode(p).encode())
- except urllib.error.HTTPError as err:
- # Not actually always an error, might just mean "keep trying..."
- response = err
- response = response.read()
- if args.debug:
- print(response)
- response = json.loads(response)
- if 'error' not in response:
- break
- if response['error'] == 'authorization_declined':
- print(' user declined authorization.')
- sys.exit(1)
- if response['error'] == 'expired_token':
- print(' too much time has elapsed.')
- sys.exit(1)
- if response['error'] != 'authorization_pending':
- print(response['error'])
- if 'error_description' in response:
- print(response['error_description'])
- sys.exit(1)
- print()
-
- else:
- sys.exit(f'ERROR: Unknown OAuth2 flow "{token["authflow"]}. Delete token file and '
- f'start over.')
-
- update_tokens(response)
-
-
-if not access_token_valid():
- if args.verbose:
- print('NOTICE: Invalid or expired access token; using refresh token '
- 'to obtain new access token.')
- if not token['refresh_token']:
- sys.exit('ERROR: No refresh token. Run script with "--authorize".')
- p = baseparams.copy()
- p.update({'client_id': token['client_id'],
- 'client_secret': token['client_secret'],
- 'refresh_token': token['refresh_token'],
- 'grant_type': 'refresh_token'})
- try:
- response = urllib.request.urlopen(registration['token_endpoint'],
- urllib.parse.urlencode(p).encode())
- except urllib.error.HTTPError as err:
- print(err.code, err.reason)
- response = err
- response = response.read()
- if args.debug:
- print(response)
- response = json.loads(response)
- if 'error' in response:
- print(response['error'])
- if 'error_description' in response:
- print(response['error_description'])
- print('Perhaps refresh token invalid. Try running once with "--authorize"')
- sys.exit(1)
- update_tokens(response)
-
-
-if not access_token_valid():
- sys.exit('ERROR: No valid access token. This should not be able to happen.')
-
-
-if args.verbose:
- print('Access Token: ', end='')
-print(token['access_token'])
-
-
-def build_sasl_string(user, host, port, bearer_token):
- '''Build appropriate SASL string, which depends on cloud server's supported SASL method.'''
- if registration['sasl_method'] == 'OAUTHBEARER':
- return f'n,a={user},\1host={host}\1port={port}\1auth=Bearer {bearer_token}\1\1'
- if registration['sasl_method'] == 'XOAUTH2':
- return f'user={user}\1auth=Bearer {bearer_token}\1\1'
- sys.exit(f'Unknown SASL method {registration["sasl_method"]}.')
-
-
-if args.test:
- errors = False
-
- imap_conn = imaplib.IMAP4_SSL(registration['imap_endpoint'])
- sasl_string = build_sasl_string(token['email'], registration['imap_endpoint'], 993,
- token['access_token'])
- if args.debug:
- imap_conn.debug = 4
- try:
- imap_conn.authenticate(registration['sasl_method'], lambda _: sasl_string.encode())
- # Microsoft has a bug wherein a mismatch between username and token can still report a
- # successful login... (Try a consumer login with the token from a work/school account.)
- # Fortunately subsequent commands fail with an error. Thus we follow AUTH with another
- # IMAP command before reporting success.
- imap_conn.list()
- if args.verbose:
- print('IMAP authentication succeeded')
- except imaplib.IMAP4.error as e:
- print('IMAP authentication FAILED (does your account allow IMAP?):', e)
- errors = True
-
- pop_conn = poplib.POP3_SSL(registration['pop_endpoint'])
- sasl_string = build_sasl_string(token['email'], registration['pop_endpoint'], 995,
- token['access_token'])
- if args.debug:
- pop_conn.set_debuglevel(2)
- try:
- # poplib doesn't have an auth command taking an authenticator object
- # Microsoft requires a two-line SASL for POP
- # pylint: disable=W0212
- pop_conn._shortcmd('AUTH ' + registration['sasl_method'])
- pop_conn._shortcmd(base64.standard_b64encode(sasl_string.encode()).decode())
- if args.verbose:
- print('POP authentication succeeded')
- except poplib.error_proto as e:
- print('POP authentication FAILED (does your account allow POP?):', e.args[0].decode())
- errors = True
-
- # SMTP_SSL would be simpler but Microsoft does not answer on port 465.
- smtp_conn = smtplib.SMTP(registration['smtp_endpoint'], 587)
- sasl_string = build_sasl_string(token['email'], registration['smtp_endpoint'], 587,
- token['access_token'])
- smtp_conn.ehlo('test')
- smtp_conn.starttls()
- smtp_conn.ehlo('test')
- if args.debug:
- smtp_conn.set_debuglevel(2)
- try:
- smtp_conn.auth(registration['sasl_method'], lambda _=None: sasl_string)
- if args.verbose:
- print('SMTP authentication succeeded')
- except smtplib.SMTPAuthenticationError as e:
- print('SMTP authentication FAILED:', e)
- errors = True
-
- if errors:
- sys.exit(1)