Create your Gitee Account
Explore and code with more than 6 million developers,Free private repositories !:)
Sign up
This repository doesn't specify license. Without author's permission, this code is only for learning and cannot be used for other purposes.
Clone or download
robot.py 9.47 KB
Copy Edit Web IDE Raw Blame History
Mendalas authored 2019-06-19 16:50 . modify api
# coding:utf-8
from flask_restful import Resource
from auth import auth
from flask import Flask, abort, request, jsonify, g, url_for
from flask_sqlalchemy import SQLAlchemy
from api import app
import json
import urllib, urllib2
import requests
db = SQLAlchemy(app)
# rfid2id ={ "2759450112":5,
# "3410259428":4,
# "3410189620":3,
# "3410259316":2,
# "756638963" :1 }
rfid2id = dict()
class RobotInfo(db.Model):
__tablename__ = 'ins_rob'
robid = db.Column(db.Integer, primary_key=True, autoincrement=True)
videoconf = db.Column(db.String(255))
audioconf = db.Column(db.String(255))
routeid = db.Column(db.Integer)
listenaddr = db.Column(db.String(255))
powerconf = db.Column(db.String(255))
speed = db.Column(db.Integer)
lastrfid = db.Column(db.BigInteger)
updatetime = db.Column(db.DateTime)
taskid = db.Column(db.Integer)
sum_route_times = db.Column(db.Integer)
sum_route_distances = db.Column(db.Integer)
status = db.Column(db.Integer)
ir_status = db.Column(db.Integer)
rsoc = db.Column(db.Integer)
rpm = db.Column(db.Integer)
#巡检计划json格式
planlist = db.Column(db.String(255))
#
def to_json(self):
return {
"robid": self.robid,
"videoconf": self.videoconf,
"audioconf": self.audioconf,
"routeid": self.routeid,
"lastrfid": self.lastrfid,
"listenaddr": self.listenaddr,
"powerconf": self.powerconf,
"sum_route_times":self.sum_route_times,
"sum_route_distances":self.sum_route_distances,
"speed": self.speed,
"status": ((int(self.status)) & (1 << 4) >> 4),
}
class getInsPlan(Resource):
def post(self):
robot = db.session.query(RobotInfo.robid,RobotInfo.planlist).all()
msg = []
for r in robot:
data = {}
data['robid'] = r[0]
data["planlist"] = r[1]
msg.append(data)
return msg
#debug 电量等信息
class upDateInsPlan(Resource):
def post(self):
id = request.form['id']
planlist = request.form['planlist']
robot = db.session.query(RobotInfo).filter(RobotInfo.robid == id).one()
robot.planlist = json.dumps(planlist)
db.session.commit()
return {"result": "update OK"}
#debug 电量等信息
class RobotCurrentStatus(Resource):
def get(self, robotId):
result = db.session.query(RobotInfo).filter(RobotInfo.robid == robotId).one()
return jsonify(result.to_json())
class RobotManualCtrDebug(Resource):
def post(self, robotId):
result = db.session.query(RobotInfo).filter(RobotInfo.robid == robotId).one()
op = request.get_json()['op']
robotAddress = ''
if op == "1":
intRequest = dict()
for k, v in request.get_json().items():
intRequest[k] = int(v)
robotAddress = result.listenaddr + "/manualctr?json=" + urllib.quote(
json.dumps(intRequest))
elif op == "2":
robotAddress = result.listenaddr + "/break?json={}"
try:
app.logger.info(robotAddress)
res = urllib2.urlopen(robotAddress, timeout=5)
res = res.read()
except BaseException as e:
return {"result": -1, "msg": "连接机器人时发生了点小问题"}
else:
return {"result": 0, "msg": "配置成功"}
class RobotControl(Resource):
def __init__(self):
if len(rfid2id) == 0:
result = db.session.query(RfidInspect).all()
app.logger.info("rfid2id is zero,init it,got size %d", len(result))
for v in result:
rfid2id[v.rfid] = v.showid
def post(self, robotId):
#收到页面切换状态请求,将直接发送消息给对应的机器人
targetId = 0
if 'id' in request.get_json():
targetId = int(request.get_json()['id'])
else:
# app.logger.error("moving robot to:%d but not found id in map", targetId)
return {"result": -1, "msg": "消息请求中未找到id字段"}
app.logger.info("moving robot to:%d", targetId)
result = db.session.query(RobotInfo).filter(
RobotInfo.robid == robotId).one()
if targetId == -1:
#目标为自动模式:
# http://192.168.0.105:9001/auto?json={"speed":10}
robotAddress = result.listenaddr + "/auto?json={\"speed\":0}"
try:
res = urllib2.urlopen(robotAddress, timeout=1)
res = res.read()
except:
return {"result": -1, "msg": "连接机器人时发生了点小问题"}
else:
return {"result": 0, "msg": "自动模式设置成功"}
else:
#目标为手动模式
targetRfid = 0
for (k, v) in rfid2id.items():
if v == int(targetId):
targetRfid = k
break
if (result and targetRfid):
robotAddress = result.listenaddr + "/manual?json={\"target_rfid\":" + str(
targetRfid) + "}"
try:
app.logger.info(
"now moving robot to rfid:%d,begin to post to %s",
targetRfid, robotAddress)
res = urllib2.urlopen(robotAddress, timeout=1)
res = res.read()
except:
return {"result": -1, "msg": "连接机器人时发生了点小问题"}
else:
return {"result": 0, "msg": "id:" + str(targetRfid)}
else:
if targetRfid == 0:
return {"result": -1, "msg": "没有找到目标id"}
else:
return {"result": -1, "msg": "没有找到机器人配置信息"}
class RobotStatusQuery(Resource):
def get(self, id, status):
results = []
if id == "0":
#getRobotConfig
results = db.session.query(RobotInfo).all()
elif status == "-1":
#这里主要是查询机器人的lastrfid 和 控制模式 注意要将lastrfid 映射上 数字id
results = db.session.query(RobotInfo).filter(
RobotInfo.robid == id).all()
if len(results) != 0:
keyid = str(results[0].lastrfid)
currId = -1
if rfid2id.has_key(keyid):
currId = rfid2id.get(keyid)
else:
# app.logger.error("rfid map not found for:%s", keyid)
return {
"currId": currId,
"status": 0,
"camera1Url": '',
"camera2Url": ''
}
status = (int(results[0].status) & (1 << 4)) >> 4
camera1 = "http://192.168.0.107/live?port=1935&app=stream&stream=" + str(
results[0].taskid) + "_0"
camera2 = "http://192.168.0.107/live?port=1935&app=stream&stream=" + str(
results[0].taskid) + "_0"
print "robot got curr id ", id, currId
return {
"currId": currId,
"status": status,
"camera1Url": camera1,
"camera2Url": camera2
}
else:
print "query robot %d RobotInfo from ins_rob,but got empty", id
return {}
else:
results = db.session.query(RobotInfo).filter(RobotInfo.robid == id).all()
v = []
for r in results:
v.append(r.to_json())
return jsonify(v)
class RobotConfigUpdate(Resource):
def post(self):
id = request.get_json()['id']
config = request.get_json()['config']
robot = db.session.query(RobotInfo).filter(RobotInfo.robid == id).one()
robot.config = json.dumps(config)
db.session.commit()
return {"result": "update OK"}
class RobotList(Resource):
def get(self):
results = db.session.query(RobotInfo.robid).all()
v = dict()
v["id"] = []
for r in results:
v["id"].append(r.robid)
return jsonify(v)
class PTZControl(Resource):
def post(self):
app.logger.info(request.get_json())
if 'PTZMsg' in request.get_json():
PTZMsg = request.get_json()['PTZMsg']
PTZUrl = request.get_json()['PTZUrl']
robot_id = request.get_json()['robot_id']
res = self.PtzCtrl(PTZMsg, PTZUrl,robot_id)
return {"result": res, "msg": res}
else:
return {"result": -1, "msg": "消息请求中未找到id字段"}
def PtzCtrl(self, PTZMsg, PTZUrl,robot_id):
headers = {"content-type": 'application/xml'}
url = ('http://192.168.100.200' if (robot_id== 1) else 'http://192.168.100.210')
url += '/ISAPI/PTZCtrl/channels/1/' + PTZUrl
if PTZUrl == 'iris':
url += '/ISAPI/System/Video/inputs/channels/1/iris'
elif PTZUrl == 'focus':
url = '/ISAPI/System/Video/inputs/channels/1/focus'
app.logger.info("put %s to %s", PTZMsg, url)
try:
response = requests.put(url,auth=('admin', 'admin888'),data=PTZMsg,headers=headers,timeout=1)
app.logger.info(response.text)
return 0
except BaseException as e:
app.logger.info(e)
return -1

Comment ( 0 )

Sign in for post a comment