1 Star 0 Fork 1

AJay13 / guacamole-py

forked from error / guacamole-py 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
guacamole.py 15.78 KB
一键复制 编辑 原始数据 按行查看 历史
error 提交于 2017-03-09 16:27 . 增加kill session
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
"""
@version: 1.0
@author: readerror
@contact: readerror@readerror.cn
@contact1: readerror000@gmail.com
@site: http://www.readerror.cn
@software: PyCharm
@file: guacamole.py
@time: 2017/2/15 8:50
"""
import requests
import json
import logging
import base64
log = logging.getLogger(__name__)
log.setLevel(logging.INFO)
class User(object):
def list_user(self):
url = '/'.join([self.base_api_url, 'session', 'data', self.data_sources, 'users' + self.token])
return json.loads(requests.get(url=url, params={'token': self.token},
headers={'Content-Type': 'application/json;charset=UTF-8'}).text)
def create_user(self, username, password):
data = {"username": username,
"password": password,
"attributes": {
"disabled": "",
"expired": "",
"access-window-start": "",
"access-window-end": "",
"valid-from": "",
"valid-until": "",
"timezone": None}
}
url = '/'.join([self.base_api_url, 'session', 'data', self.data_sources, 'users'])
return json.loads(requests.post(url=url, json=data, params={'token': self.token},
headers={'Content-Type': 'application/json;charset=UTF-8'}).text)
def update_user(self, username, new_password, new_username=''):
if new_username == '':
new_username = username
data = {"username": new_username,
"password": new_password,
"attributes": {
"disabled": "",
"expired": "",
"access-window-start": "",
"access-window-end": "",
"valid-from": "",
"valid-until": "",
"timezone": None}
}
url = '/'.join([self.base_api_url, 'session', 'data', self.data_sources, 'users', username])
return requests.put(url=url, json=data, params={'token': self.token},
headers={'Content-Type': 'application/json;charset=UTF-8'}).text
def delete_user(self, username):
url = '/'.join([self.base_api_url, 'session', 'data', self.data_sources, 'users', username])
return requests.delete(url=url, params={'token': self.token}).text
def permissions(self, username, connection_id, connection_group_id):
data = [
{
"op": "add",
"path": r"/connectionPermissions/" + str(connection_id),
"value": "READ"
},
]
if connection_group_id != 'ROOT':
data.append({
"op": "add",
"path": r"/connectionGroupPermissions/" + str(connection_group_id),
"value": "READ"
})
url = '/'.join([self.base_api_url, 'session', 'data', self.data_sources, 'users', username, 'permissions'])
return requests.patch(url=url, json=data, params={'token': self.token},
headers={'Content-Type': 'application/json;charset=UTF-8'}).text
def get_user_info(self, username, password):
_client = Client(username, password, self.host, self.port, self.server_path)
_client.login()
url = '/'.join([self.base_api_url, 'session', 'data', self.data_sources, 'connectionGroups', 'ROOT', 'tree'])
return json.loads(requests.get(url=url, params={'token': _client.token}).text)
def get_user_group_id(self, username):
try:
info = self.get_user_info(self.username, self.password)['childConnectionGroups']
for group in info:
if group.get('name', '') == username:
return group.get('identifier', None)
except KeyError:
log.warning('ConnectionGroups list empty, ')
# raise KeyError('ConnectionGroupsListEmpty')
def get_user_connection_id(self, username, connection_name=''):
if connection_name == '':
connection_name = username
try:
info = self.get_user_info(self.username, self.password)['childConnectionGroups']
for group in info:
if group.get('name', 'ROOT') == username:
for connection in group.get('childConnections', [{}]):
if connection.get('name', '') == connection_name:
return connection.get('identifier', None)
except KeyError:
log.warning('ConnectionGroups list empty')
# raise KeyError('ConnectionGroupsListEmpty')
def get_connections_id(self, connection_name):
# TODO Deep 2 now, more
connections_id = []
try:
info = self.get_user_info(self.username, self.password)
for group in info.get('childConnectionGroups', [{}]):
for connection in group.get('childConnections', [{}]):
if connection.get('name', '') == connection_name:
connections_id.append(connection.get('identifier', None))
for connection in info.get('childConnections', [{}]):
if connection.get('name', '') == connection_name:
connections_id.append(connection.get('identifier', None))
except KeyError:
log.warning('ConnectionGroups list empty')
# raise KeyError('ConnectionGroupsListEmpty')
return connections_id
def get_user_permissions(self, username):
url = '/'.join([self.base_api_url, 'session', 'data', self.data_sources, 'users', username, 'permissions'])
return json.loads(requests.get(url=url, params={'token': self.token}).text)
class ConnectionGroup(object):
def create_connection_group(self, name):
data = {
"parentIdentifier": "ROOT",
"name": name,
"type": "ORGANIZATIONAL",
"attributes": {
"max-connections": "1",
"max-connections-per-user": "1",
"enable-session-affinity": "false"
}
}
url = '/'.join([self.base_api_url, 'session', 'data', self.data_sources, 'connectionGroups'])
return json.loads(requests.post(url=url, json=data, params={'token': self.token},
headers={'Content-Type': 'application/json;charset=UTF-8'}).text)
def delete_connection_group(self, connection_group_id):
url = '/'.join([self.base_api_url, 'session', 'data',
self.data_sources, 'connectionGroups', str(connection_group_id)])
return requests.delete(url=url, params={'token': self.token}).text
class Connection(object):
def get_active_connections(self):
url = '/'.join([self.base_api_url, 'session', 'data', self.data_sources, 'activeConnections'])
return json.loads(requests.get(url=url, params={'token': self.token}).text)
def kill_active_connection(self, identifier):
data = [{
"op": "remove", "path": ''.join(["/", identifier])
}]
url = '/'.join([self.base_api_url, 'session', 'data', self.data_sources, 'activeConnections'])
return requests.patch(url=url, json=data, params={'token': self.token},
headers={'Content-Type': 'application/json;charset=UTF-8'}).text
def kill_user_active_connections(self, username):
data = {}
for k, v in self.get_active_connections().items():
if v.get('username') == username:
data[k] = self.kill_active_connection(v.get('identifier'))
return data
def create_rdp_connection(self, name, hostname, username, password, parent_identifier='ROOT', performance=False):
data = {
"parentIdentifier": str(parent_identifier),
"name": name,
"protocol": 'rdp',
"parameters": {
"port": "3389",
"read-only": "",
"swap-red-blue": "",
"cursor": "",
"color-depth": "",
"clipboard-encoding": "",
"dest-port": "",
"create-recording-path": "",
"enable-sftp": "",
"sftp-port": "",
"enable-audio": "true" if performance > 4 else "",
"security": "",
"disable-auth": "",
"ignore-cert": "",
"server-layout": "",
"console": "",
"width": "",
"height": "",
"dpi": "",
"resize-method": "",
"console-audio": "",
"disable-audio": "",
"enable-audio-input": "true" if performance > 4 else "",
"enable-printing": "true" if performance > 4 else "",
"enable-drive": "true" if performance > 4 else "",
"create-drive-path": "",
"enable-wallpaper": "true" if performance > 0 else "",
"enable-theming": "true" if performance > 1 else "",
"enable-font-smoothing": "true" if performance > 2 else "", # (ClearType)
"enable-full-window-drag": "true" if performance > 2 else "",
"enable-desktop-composition": "true" if performance > 3 else "", # (Aero)
"enable-menu-animations": "true" if performance > 3 else "",
"preconnection-id": "",
"hostname": hostname,
"username": username,
"password": password
},
"attributes": {
"max-connections": "1",
"max-connections-per-user": "1"
}
}
url = '/'.join([self.base_api_url, 'session', 'data', self.data_sources, 'connections'])
return json.loads(requests.post(url=url, json=data, params={'token': self.token},
headers={'Content-Type': 'application/json;charset=UTF-8'}).text)
def delete_connection(self, connection_id):
url = '/'.join([self.base_api_url, 'session', 'data', self.data_sources, 'connections', str(connection_id)])
return requests.delete(url=url, params={'token': self.token}).text
def get_connection_info(self, connection_id):
url = '/'.join([self.base_api_url, 'session', 'data', self.data_sources, 'connections', str(connection_id)])
return json.loads(requests.get(url=url, params={'token': self.token}).text)
def can_connect(self, connection_id):
info = self.get_connection_info(connection_id)
if info.get('message', None):
log.error(info.get('message'))
return False
else:
return True
class Client(User, ConnectionGroup, Connection):
def __init__(self, username, password, host='127.0.0.1', port=8080, server_path=''):
super(Client, self).__init__()
self.CONNECTION_TYPE = {
'connection': 'c'
}
self.username = username
self.password = password
self.token = ''
self.available_data_sources = ''
self.data_sources = ''
self.host = host
self.port = port
self.server_path = server_path
self.base_url = ''.join([r'http://', '/'.join([':'.join([host, str(self.port)]), server_path, '#'])])
self.base_api_url = ''.join([r'http://', '/'.join([':'.join([host, str(self.port)]), server_path, 'api'])])
# self.login()
# def __del__(self):
# if self.token is not '':
# self.logout()
# for method in [getattr(Client, e) for e in dir(Client) if '__' not in e]:
# del method
# print(self.__class__.login)
def login(self):
data = {
"username": self.username,
"password": self.password,
}
try:
res = json.loads(requests.post(url='/'.join([self.base_api_url, 'tokens']), data=data,
headers={'Content-Type': 'application/x-www-form-urlencoded'}).text)
except:
log.warning('guacamole client connection failed')
return None
try:
self.token = res['authToken']
self.available_data_sources = res['availableDataSources']
self.data_sources = res['dataSource']
return self.token
except KeyError:
log.warning('username:%s login failed' % self.username)
return None
# self.__del__()
def logout(self, force=True):
if force:
self.login()
return requests.delete(url='/'.join([self.base_api_url, 'tokens', self.token])).text
def info(self):
url = '/'.join([self.base_api_url, 'session', 'data', self.data_sources, 'connectionGroups', 'ROOT', 'tree'])
return json.loads(requests.get(url=url, params={'token': self.token}).text)
def get_connection_url(self, connection_id, connection_type):
path = b'\x00'.join([str(connection_id).encode(),
self.CONNECTION_TYPE.get(connection_type, 'c').encode(),
self.data_sources.encode()])
return ''.join(['/'.join([self.base_url, 'client', base64.b64encode(path).decode()]), '?token=', self.token])
def create_user_connection(client, username, password, remote_ip, host_username, host_password,
performance=0, connection_name='', connection_type='rdp', only_one=True):
if client.create_user(username, password).get('type', '') == 'BAD_REQUEST':
client.update_user(username, password)
# log.warning('用户存在,已更新用户密码')
if connection_name == '':
connection_name = username
if only_one:
new_group_id = "ROOT"
else:
res = client.create_connection_group(username)
new_group_id = res.get('identifier', None) or client.get_user_group_id(username) or "ROOT"
if connection_type == 'rdp':
res = client.create_rdp_connection(name=connection_name, hostname=remote_ip, username=host_username,
password=host_password, parent_identifier=new_group_id,
performance=performance)
else:
res = client.create_rdp_connection(name=connection_name, hostname=remote_ip, username=host_username,
password=host_password, parent_identifier=new_group_id,
performance=performance)
if res.get('message', None):
log.error(res['message'])
else:
new_connection_id = res.get('identifier', None) or client.get_user_connection_id(connection_name)
log.info('New connection id:%s' % new_connection_id)
if new_connection_id:
client.permissions(username=username, connection_id=new_connection_id, connection_group_id=new_group_id)
user_client = Client(username, password, client.host, client.port, client.server_path)
if user_client.login() and user_client.can_connect(new_connection_id):
return user_client.get_connection_url(new_connection_id, 'connection')
def delete_user_all_residuals(client, username):
# Client(username, password, client.host, client.port, client.server_path).logout()
client.kill_user_active_connections(username)
connection_id = client.get_user_connection_id(username)
if connection_id:
client.delete_connection(connection_id)
connection_group_id = client.get_user_group_id(username)
if connection_group_id:
client.delete_connection_group(connection_group_id)
client.delete_user(username)
def delete_connections(client, connection_name):
for _id in client.get_connections_id(connection_name):
client.delete_connection(_id)
log.info('Connection id:%s has been delete' % _id)
Python
1
https://gitee.com/AJay13/guacamole-py.git
git@gitee.com:AJay13/guacamole-py.git
AJay13
guacamole-py
guacamole-py
master

搜索帮助