449 lines
15 KiB
Text
449 lines
15 KiB
Text
|
#!/usr/bin/env python3
|
||
|
|
||
|
# Copyright (c) 2018-2021 Markus Blöchl <ususdei@gmail.com>
|
||
|
#
|
||
|
# This file is part of qutebrowser.
|
||
|
#
|
||
|
# qutebrowser is free software: you can redistribute it and/or modify
|
||
|
# it under the terms of the GNU General Public License as published by
|
||
|
# the Free Software Foundation, either version 3 of the License, or
|
||
|
# (at your option) any later version.
|
||
|
#
|
||
|
# qutebrowser is distributed in the hope that it will be useful,
|
||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||
|
# GNU General Public License for more details.
|
||
|
#
|
||
|
# You should have received a copy of the GNU General Public License
|
||
|
# along with qutebrowser. If not, see <https://www.gnu.org/licenses/>.
|
||
|
|
||
|
"""
|
||
|
# Introduction
|
||
|
|
||
|
This is a [qutebrowser][2] [userscript][5] to fill website credentials from a [KeepassXC][1] password database.
|
||
|
|
||
|
|
||
|
# Installation
|
||
|
|
||
|
First, you need to enable [KeepassXC-Browser][6] extensions in your KeepassXC config.
|
||
|
|
||
|
|
||
|
Second, you must make sure to have a working private-public-key-pair in your [GPG keyring][3].
|
||
|
|
||
|
|
||
|
Third, install the python module `pynacl`.
|
||
|
|
||
|
|
||
|
Finally, adapt your qutebrowser config.
|
||
|
You can e.g. add the following lines to your `~/.config/qutebrowser/config.py`
|
||
|
Remember to replace `ABC1234` with your actual GPG key.
|
||
|
|
||
|
```python
|
||
|
config.bind('<Alt-Shift-u>', 'spawn --userscript qute-keepassxc --key ABC1234', mode='insert')
|
||
|
config.bind('pw', 'spawn --userscript qute-keepassxc --key ABC1234', mode='normal')
|
||
|
```
|
||
|
|
||
|
To manage multiple accounts you also need [rofi](https://github.com/davatorium/rofi) installed.
|
||
|
|
||
|
|
||
|
# Usage
|
||
|
|
||
|
If you are on a webpage with a login form, simply activate one of the configured key-bindings.
|
||
|
|
||
|
The first time you run this script, KeepassXC will ask you for authentication like with any other browser extension.
|
||
|
Just provide a name of your choice and accept the request if nothing looks fishy.
|
||
|
|
||
|
|
||
|
# How it works
|
||
|
|
||
|
This script will talk to KeepassXC using the native [KeepassXC-Browser protocol][4].
|
||
|
|
||
|
|
||
|
This script needs to store the key used to associate with your KeepassXC instance somewhere.
|
||
|
Unlike most browser extensions which only use plain local storage, this one attempts to do so in a safe way
|
||
|
by storing the key in encrypted form using GPG.
|
||
|
Therefore you need to have a public-key-pair readily set up.
|
||
|
|
||
|
GPG might then ask for your private-key password whenever you query the database for login credentials.
|
||
|
|
||
|
|
||
|
# TOTP
|
||
|
|
||
|
This script recently received experimental TOTP support.
|
||
|
To use it, you need to have working TOTP authentication within KeepassXC.
|
||
|
Then call `qute-keepassxc` with the `--totp` flags.
|
||
|
|
||
|
For example, I have the following line in my `config.py`:
|
||
|
|
||
|
```python
|
||
|
config.bind('pt', 'spawn --userscript qute-keepassxc --key ABC1234 --totp', mode='normal')
|
||
|
```
|
||
|
|
||
|
For now this script will simply insert the TOTP-token into the currently selected
|
||
|
input field, since I have not yet found a reliable way to identify the correct field
|
||
|
within all existing login forms.
|
||
|
Thus you need to manually select the TOTP input field, press escape to leave input
|
||
|
mode and then enter `pt` to fill in the token (or configure another key-binding for
|
||
|
insert mode if you prefer that).
|
||
|
|
||
|
|
||
|
[1]: https://keepassxc.org/
|
||
|
[2]: https://qutebrowser.org/
|
||
|
[3]: https://gnupg.org/
|
||
|
[4]: https://github.com/keepassxreboot/keepassxc-browser/blob/develop/keepassxc-protocol.md
|
||
|
[5]: https://github.com/qutebrowser/qutebrowser/blob/master/doc/userscripts.asciidoc
|
||
|
[6]: https://keepassxc.org/docs/KeePassXC_GettingStarted.html#_setup_browser_integration
|
||
|
"""
|
||
|
|
||
|
import sys
|
||
|
import os
|
||
|
import socket
|
||
|
import json
|
||
|
import base64
|
||
|
import subprocess
|
||
|
import argparse
|
||
|
|
||
|
import nacl.utils
|
||
|
import nacl.public
|
||
|
|
||
|
|
||
|
def parse_args():
|
||
|
parser = argparse.ArgumentParser(description="Full passwords from KeepassXC")
|
||
|
parser.add_argument('url', nargs='?', default=os.environ.get('QUTE_URL'))
|
||
|
parser.add_argument('--totp', action='store_true',
|
||
|
help="Fill in current TOTP field instead of username/password")
|
||
|
parser.add_argument('--socket', '-s', default='/run/user/{}/org.keepassxc.KeePassXC.BrowserServer'.format(os.getuid()),
|
||
|
help='Path to KeepassXC browser socket')
|
||
|
parser.add_argument('--key', '-k', default='alice@example.com',
|
||
|
help='GPG key to encrypt KeepassXC auth key with')
|
||
|
parser.add_argument('--insecure', action='store_true',
|
||
|
help="Do not encrypt auth key")
|
||
|
return parser.parse_args()
|
||
|
|
||
|
|
||
|
class KeepassError(Exception):
|
||
|
def __init__(self, code, desc):
|
||
|
self.code = code
|
||
|
self.description = desc
|
||
|
|
||
|
def __str__(self):
|
||
|
return f"KeepassXC Error [{self.code}]: {self.description}"
|
||
|
|
||
|
|
||
|
class KeepassXC:
|
||
|
""" Wrapper around the KeepassXC socket API """
|
||
|
def __init__(self, id=None, *, key, socket_path):
|
||
|
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||
|
self.id = id
|
||
|
self.socket_path = socket_path
|
||
|
self.client_key = nacl.public.PrivateKey.generate()
|
||
|
self.id_key = nacl.public.PrivateKey.from_seed(key)
|
||
|
self.cryptobox = None
|
||
|
|
||
|
def connect(self):
|
||
|
if not os.path.exists(self.socket_path):
|
||
|
raise KeepassError(-1, "KeepassXC Browser socket does not exists")
|
||
|
self.client_id = base64.b64encode(nacl.utils.random(nacl.public.Box.NONCE_SIZE)).decode('utf-8')
|
||
|
self.sock.connect(self.socket_path)
|
||
|
|
||
|
self.send_raw_msg(dict(
|
||
|
action = 'change-public-keys',
|
||
|
publicKey = base64.b64encode(self.client_key.public_key.encode()).decode('utf-8'),
|
||
|
nonce = base64.b64encode(nacl.utils.random(nacl.public.Box.NONCE_SIZE)).decode('utf-8'),
|
||
|
clientID = self.client_id
|
||
|
))
|
||
|
|
||
|
resp = self.recv_raw_msg()
|
||
|
assert resp['action'] == 'change-public-keys'
|
||
|
assert resp['success'] == 'true'
|
||
|
assert resp['nonce']
|
||
|
self.cryptobox = nacl.public.Box(
|
||
|
self.client_key,
|
||
|
nacl.public.PublicKey(base64.b64decode(resp['publicKey']))
|
||
|
)
|
||
|
|
||
|
def get_databasehash(self):
|
||
|
self.send_msg(dict(action='get-databasehash'))
|
||
|
return self.recv_msg()['hash']
|
||
|
|
||
|
def lock_database(self):
|
||
|
self.send_msg(dict(action='lock-database'))
|
||
|
try:
|
||
|
self.recv_msg()
|
||
|
except KeepassError as e:
|
||
|
if e.code == 1:
|
||
|
return True
|
||
|
raise
|
||
|
return False
|
||
|
|
||
|
|
||
|
def test_associate(self):
|
||
|
if not self.id:
|
||
|
return False
|
||
|
self.send_msg(dict(
|
||
|
action = 'test-associate',
|
||
|
id = self.id,
|
||
|
key = base64.b64encode(self.id_key.public_key.encode()).decode('utf-8')
|
||
|
), triggerUnlock = 'true')
|
||
|
return self.recv_msg()['success'] == 'true'
|
||
|
|
||
|
def associate(self):
|
||
|
self.send_msg(dict(
|
||
|
action = 'associate',
|
||
|
key = base64.b64encode(self.client_key.public_key.encode()).decode('utf-8'),
|
||
|
idKey = base64.b64encode(self.id_key.public_key.encode()).decode('utf-8')
|
||
|
))
|
||
|
resp = self.recv_msg()
|
||
|
self.id = resp['id']
|
||
|
|
||
|
def get_logins(self, url):
|
||
|
self.send_msg(dict(
|
||
|
action = 'get-logins',
|
||
|
url = url,
|
||
|
keys = [{ 'id': self.id, 'key': base64.b64encode(self.id_key.public_key.encode()).decode('utf-8') }]
|
||
|
))
|
||
|
return self.recv_msg()['entries']
|
||
|
|
||
|
def get_totp(self, uuid):
|
||
|
self.send_msg(dict(
|
||
|
action = 'get-totp',
|
||
|
uuid = uuid
|
||
|
))
|
||
|
response = self.recv_msg()
|
||
|
if response['success'] != 'true' or not response['totp']:
|
||
|
return None
|
||
|
return response['totp']
|
||
|
|
||
|
def send_raw_msg(self, msg):
|
||
|
self.sock.send( json.dumps(msg).encode('utf-8') )
|
||
|
|
||
|
def recv_raw_msg(self):
|
||
|
return json.loads( self.sock.recv(4096).decode('utf-8') )
|
||
|
|
||
|
def send_msg(self, msg, **extra):
|
||
|
nonce = nacl.utils.random(nacl.public.Box.NONCE_SIZE)
|
||
|
self.send_raw_msg(dict(
|
||
|
action = msg['action'],
|
||
|
message = base64.b64encode(self.cryptobox.encrypt(json.dumps(msg).encode('utf-8'), nonce).ciphertext).decode('utf-8'),
|
||
|
nonce = base64.b64encode(nonce).decode('utf-8'),
|
||
|
clientID = self.client_id,
|
||
|
**extra
|
||
|
))
|
||
|
|
||
|
def recv_msg(self):
|
||
|
resp = self.recv_raw_msg()
|
||
|
if 'error' in resp:
|
||
|
raise KeepassError(resp['errorCode'], resp['error'])
|
||
|
assert resp['action']
|
||
|
return json.loads(self.cryptobox.decrypt(base64.b64decode(resp['message']), base64.b64decode(resp['nonce'])).decode('utf-8'))
|
||
|
|
||
|
|
||
|
|
||
|
class SecretKeyStore:
|
||
|
def __init__(self, gpgkey, insecure):
|
||
|
self.gpgkey = gpgkey
|
||
|
self.insecure = insecure
|
||
|
if self.insecure:
|
||
|
self.path = os.path.join(os.environ['QUTE_DATA_DIR'], 'keepassxc.key')
|
||
|
else:
|
||
|
self.path = os.path.join(os.environ['QUTE_DATA_DIR'], 'keepassxc.key.gpg')
|
||
|
|
||
|
def load(self):
|
||
|
"Load existing association key from file"
|
||
|
if self.insecure:
|
||
|
jsondata = open(self.path, 'r').read()
|
||
|
else:
|
||
|
jsondata = subprocess.check_output(['gpg', '--decrypt', self.path]).decode('utf-8')
|
||
|
data = json.loads(jsondata)
|
||
|
self.id = data['id']
|
||
|
self.key = base64.b64decode(data['key'])
|
||
|
|
||
|
def create(self):
|
||
|
"Create new association key"
|
||
|
self.key = nacl.utils.random(32)
|
||
|
self.id = None
|
||
|
|
||
|
def store(self, id):
|
||
|
"Store newly created association key in file"
|
||
|
self.id = id
|
||
|
jsondata = json.dumps({'id':self.id, 'key':base64.b64encode(self.key).decode('utf-8')})
|
||
|
if self.insecure:
|
||
|
open(self.path, "w").write(jsondata)
|
||
|
else:
|
||
|
subprocess.run(['gpg', '--encrypt', '-o', self.path, '-r', self.gpgkey], input=jsondata.encode('utf-8'), check=True)
|
||
|
|
||
|
|
||
|
def qute(cmd):
|
||
|
with open(os.environ['QUTE_FIFO'], 'w') as fifo:
|
||
|
fifo.write(cmd)
|
||
|
fifo.write('\n')
|
||
|
fifo.flush()
|
||
|
|
||
|
def error(msg):
|
||
|
print(msg, file=sys.stderr)
|
||
|
qute('message-error "{}"'.format(msg))
|
||
|
|
||
|
|
||
|
def connect_to_keepassxc(args):
|
||
|
if not args.insecure and not args.key:
|
||
|
error("Missing GPG key to use for auth key encryption")
|
||
|
return
|
||
|
keystore = SecretKeyStore(args.key, args.insecure)
|
||
|
if os.path.isfile(keystore.path):
|
||
|
keystore.load()
|
||
|
kp = KeepassXC(keystore.id, key=keystore.key, socket_path=args.socket)
|
||
|
kp.connect()
|
||
|
if not kp.test_associate():
|
||
|
error('No KeepassXC association')
|
||
|
return None
|
||
|
else:
|
||
|
keystore.create()
|
||
|
kp = KeepassXC(key=keystore.key, socket_path=args.socket)
|
||
|
kp.connect()
|
||
|
kp.associate()
|
||
|
if not kp.test_associate():
|
||
|
error('No KeepassXC association')
|
||
|
return None
|
||
|
keystore.store(kp.id)
|
||
|
return kp
|
||
|
|
||
|
|
||
|
def select_account(creds):
|
||
|
try:
|
||
|
if len(creds) == 1:
|
||
|
return creds[0]
|
||
|
idx = subprocess.check_output(
|
||
|
['rofi', '-dmenu', '-format', 'i', '-matching', 'fuzzy',
|
||
|
'-p', 'Search',
|
||
|
'-mesg', '<b>qute-keepassxc</b>: select an account, please!'],
|
||
|
input=b"\n".join(c['login'].encode('utf-8') for c in creds)
|
||
|
)
|
||
|
idx = int(idx)
|
||
|
if idx < 0:
|
||
|
return None
|
||
|
return creds[idx]
|
||
|
except subprocess.CalledProcessError:
|
||
|
return None
|
||
|
except FileNotFoundError:
|
||
|
error("rofi not found. Please install rofi to select from multiple credentials")
|
||
|
return creds[0]
|
||
|
except Exception as e:
|
||
|
error(f"Error while picking account: {e}")
|
||
|
return None
|
||
|
|
||
|
|
||
|
def make_js_code(username, password):
|
||
|
return ' '.join("""
|
||
|
function isVisible(elem) {
|
||
|
var style = elem.ownerDocument.defaultView.getComputedStyle(elem, null);
|
||
|
|
||
|
if (style.getPropertyValue("visibility") !== "visible" ||
|
||
|
style.getPropertyValue("display") === "none" ||
|
||
|
style.getPropertyValue("opacity") === "0") {
|
||
|
return false;
|
||
|
}
|
||
|
|
||
|
return elem.offsetWidth > 0 && elem.offsetHeight > 0;
|
||
|
};
|
||
|
|
||
|
function hasPasswordField(form) {
|
||
|
var inputs = form.getElementsByTagName("input");
|
||
|
for (var j = 0; j < inputs.length; j++) {
|
||
|
var input = inputs[j];
|
||
|
if (input.type === "password") {
|
||
|
return true;
|
||
|
}
|
||
|
}
|
||
|
return false;
|
||
|
};
|
||
|
|
||
|
function loadData2Form (form) {
|
||
|
var inputs = form.getElementsByTagName("input");
|
||
|
for (var j = 0; j < inputs.length; j++) {
|
||
|
var input = inputs[j];
|
||
|
if (isVisible(input) && (input.type === "text" || input.type === "email")) {
|
||
|
input.focus();
|
||
|
input.value = %s;
|
||
|
input.dispatchEvent(new Event('input', { 'bubbles': true }));
|
||
|
input.dispatchEvent(new Event('change', { 'bubbles': true }));
|
||
|
input.blur();
|
||
|
}
|
||
|
if (input.type === "password") {
|
||
|
input.focus();
|
||
|
input.value = %s;
|
||
|
input.dispatchEvent(new Event('input', { 'bubbles': true }));
|
||
|
input.dispatchEvent(new Event('change', { 'bubbles': true }));
|
||
|
input.blur();
|
||
|
}
|
||
|
}
|
||
|
};
|
||
|
|
||
|
function fillFirstForm() {
|
||
|
var forms = document.getElementsByTagName("form");
|
||
|
for (i = 0; i < forms.length; i++) {
|
||
|
if (hasPasswordField(forms[i])) {
|
||
|
loadData2Form(forms[i]);
|
||
|
return;
|
||
|
}
|
||
|
}
|
||
|
alert("No Credentials Form found");
|
||
|
};
|
||
|
|
||
|
fillFirstForm()
|
||
|
""".splitlines()) % (json.dumps(username), json.dumps(password))
|
||
|
|
||
|
|
||
|
def make_js_totp_code(totp):
|
||
|
return ' '.join("""
|
||
|
(function () {
|
||
|
var input = document.activeElement;
|
||
|
if (!input || input.tagName !== "INPUT") {
|
||
|
alert("No TOTP input field selected");
|
||
|
return;
|
||
|
}
|
||
|
input.value = %s;
|
||
|
input.dispatchEvent(new Event('input', { 'bubbles': true }));
|
||
|
input.dispatchEvent(new Event('change', { 'bubbles': true }));
|
||
|
})();
|
||
|
""".splitlines()) % (json.dumps(totp),)
|
||
|
|
||
|
|
||
|
def main():
|
||
|
if 'QUTE_FIFO' not in os.environ:
|
||
|
print(f"No QUTE_FIFO found - {sys.argv[0]} must be run as a qutebrowser userscript")
|
||
|
sys.exit(-1)
|
||
|
|
||
|
try:
|
||
|
args = parse_args()
|
||
|
assert args.url, "Missing URL"
|
||
|
kp = connect_to_keepassxc(args)
|
||
|
if not kp:
|
||
|
error('Could not connect to KeepassXC')
|
||
|
return
|
||
|
creds = kp.get_logins(args.url)
|
||
|
if not creds:
|
||
|
error('No credentials found')
|
||
|
return
|
||
|
cred = select_account(creds)
|
||
|
if not cred:
|
||
|
error('No credentials selected')
|
||
|
return
|
||
|
if args.totp:
|
||
|
uuid = cred['uuid']
|
||
|
totp = kp.get_totp(uuid)
|
||
|
if not totp:
|
||
|
error('No TOTP key found')
|
||
|
return
|
||
|
qute('jseval -q ' + make_js_totp_code(totp))
|
||
|
else:
|
||
|
name, pw = cred['login'], cred['password']
|
||
|
if name and pw:
|
||
|
qute('jseval -q ' + make_js_code(name, pw))
|
||
|
except Exception as e:
|
||
|
error(str(e))
|
||
|
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
main()
|
||
|
|