You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ELAB-partsearch/parts/server.py

307 lines
11 KiB

import os
import re
import json
8 years ago
import sqlalchemy
from functools import wraps
from sqlalchemy.sql import select
from sqlalchemy.sql import text
8 years ago
from flask import Flask
from flask import render_template, send_from_directory, request, Response, send_file
from PIL import Image, ImageDraw
from io import BytesIO
from os import listdir
from os.path import isfile, join
from werkzeug.utils import secure_filename
8 years ago
app = Flask(__name__)
db_engine = {}
db_metadata = {}
parts = {}
def getContainers():
query = "select id, name from containers order by UPPER(name);"
r = db_engine.execute(text(query))
containers = []
for row in r:
containers.append(dict(row))
r.close()
return containers
def check_auth(username, password):
admin_list = []
with open('edit_admin.json', 'r') as admin:
admin_list = json.load(admin)
for user in admin_list:
if username == user['username']:
return password == user['password']
def authenticate():
return Response('Could not verify access level. Please retry', 401, {'WWW-Authenticate' : 'Basic realm="Login Required"'})
def requires_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
auth = request.authorization
if not auth or not check_auth(auth.username, auth.password):
return authenticate()
return f(*args, **kwargs)
return decorated
def serveImage(img):
img_io = BytesIO()
img.convert("RGB").save(img_io, 'PNG')
img_io.seek(0)
return send_file(img_io, mimetype='image/png')
@app.route('/parts', strict_slashes=False)
8 years ago
def index():
return render_template('partsearch.html', containers=getContainers())
@app.route('/parts/getlocationsInContainer/<containerID>')
def get_locations_in_container(containerID):
s = 'select id, name from locations where container_id = :id order by name;'
r = db_engine.execute(text(s), id=containerID)
l={}
for row in r:
l[row[0]]= row[1];
r.close()
return json.dumps(l)
@app.route('/parts/getlocationURL/<locationID>')
def get_locationURL(locationID):
s = 'select map from locations where id = :id;'
r = db_engine.execute(text(s), id=locationID)
l=[];
for row in r:
l.append(row);
r.close()
return l[0][0]
@app.route('/parts/locationEditor')
def locationEditor():
query = 'select c.name as container, l.name as name, l.id, c.id as container_id from locations as l inner join containers as c on l.container_id = c.id order by container, name;'
r = db_engine.execute(text(query))
locations = []
for row in r:
locations.append(dict(row))
r.close()
return render_template('locationEditor.html', locations=locations, containers=getContainers())
@app.route('/parts/alterLocation/<locationID>', methods=['POST'])
@requires_auth
def alterLocation(locationID):
locationID = int(locationID)
s = ''
if locationID < 0:
# New entry
s = 'insert into locations (name, container_id) '
s += 'values (:name, :container);'
s = text(s)
r = db_engine.execute(s,name=request.form['name'],container=request.form['container']);
r.close()
return '{"status":"ok"}'
else:
# Modify entry
s = 'update locations '
s += 'set name=:name, container_id=:container '
s += 'where id=:locationID;'
s = text(s)
r = db_engine.execute(s, name=request.form['name'],container=request.form['container'],locationID=locationID);
r.close()
return '{"status":"ok"}'
@app.route('/parts/getpartinfo/<partID>')
def get_part_info(partID):
s = 'select p.id,partno,description, c.name || l.name as location_descriptor, location_id, container_id, datasheet from parts as p inner join locations as l on p.location_id = l.id inner join containers as c on l.container_id = c.id where p.id = :id;'
r = db_engine.execute(text(s), id=partID)
l = []
for row in r:
l.append(dict(row))
r.close()
return json.dumps(l[0])
@app.route('/parts/query/<filter_dummy>/<query>') # TODO: maybe change AND to OR or maybe not
def query(filter_dummy, query):
filter = request.args.to_dict()
keywords = query.split() # Default splits with spaces
for i in range(len(keywords)):
keywords[i] = '%' + keywords[i] + '%'
kw_dict = {}
for i in range(len(keywords)):
kw_dict["kw" + str(i)] = keywords[i]
s = 'select p.id,partno,description, c.name || l.name as location_descriptor from parts as p inner join locations as l on p.location_id = l.id inner join containers as c on l.container_id = c.id where '
if filter['l'] == 'true':
s += '('
for i in range(len(keywords)):
s += 'LOWER(l.name) like LOWER(:kw'+ str(i) +') AND '
s = s[:-5]
s += ') OR '
if filter['p'] == 'true':
s += '('
for i in range(len(keywords)):
s += 'LOWER(partno) like LOWER(:kw'+ str(i) +') AND '
s = s[:-5]
s += ') OR '
if filter['d'] == 'true':
s += '('
for i in range(len(keywords)):
s += 'LOWER(description) like LOWER(:kw'+ str(i) +') AND '
s = s[:-5]
s += ') OR '
s = s[:-4] + ';'
s = text(s)
r = db_engine.execute(s, kw_dict)
l = []
for row in r:
l.append(dict(row))
r.close()
return json.dumps(l)
@app.route('/parts/map/<containerID>')
def getMap(containerID):
s = 'select map, overlay from containers where id = :id;'
r = db_engine.execute(text(s), id=containerID)
l = []
for row in r:
l.append(dict(row))
mapFile = l[0]['map']
overlayFile = l[0]['overlay']
try:
mapImage = Image.open('maps/' + mapFile).convert("RGBA")
overlayImage = Image.open('maps/overlays/' + overlayFile).convert("RGBA")
except FileNotFoundError:
return serveImage(Image.open("maps/404.png"))
# if request.args.get('x') is not None and request.args.get('y') is not None:
# x = int(request.args.get('x'))
# y = int(request.args.get('y'))
# pointer = Image.open('maps/here.png')
# pointerLayer = Image.new("RGBA", mapimage.size)
# width, height = pointer.size
# pointerLayer.paste(pointer, (x - int(width/2), y-int(height/2)))
mapimage = Image.alpha_composite(mapImage, overlayImage)
return serveImage(mapimage)
@app.route('/parts/getfile/<filename>')
def getfile(filename):
8 years ago
if(re.match('^[\w\-_]+$', filename) == None):
return 'No injections pls.'
8 years ago
return send_from_directory('/srv/datasheets/', filename + '.pdf')
@app.route('/parts/alter/<partID>', methods=['POST'])
@requires_auth
def alter(partID):
partID = int(partID)
s = ''
r = {}
if partID < 0:
# New entry
s = 'insert into parts (partno, description, datasheet, location_id) '
s += 'values (:partno, :description, :datasheet, :location_id) returning id;'
s = text(s)
if len(request.files) != 0:
datasheet_file = request.files['datasheet-file']
datasheet_filename = secure_filename(datasheet_file.filename)
i = 1
6 years ago
while os.path.isfile('srv/datasheets/' + datasheet_filename):
datasheet_filename = datasheet_filename[:-4] + str(i) + '.pdf'
i += 1
8 years ago
datasheet_file.save('/srv/datasheets/' + datasheet_filename)
datasheet_filename = 'http://elab.kth.se/parts/getfile/' + datasheet_filename
# elif request.form.has_key('datasheet-url'):
# datasheet_filename = request.form['datasheet-url']
else:
try:
datasheet_filename = request.form['datasheet-url']
except:
datasheet_filename = None
r = db_engine.execute(s, partno=request.form['partno'],
description=request.form['description'],
datasheet=datasheet_filename,
location_id=request.form['location_id'])
else:
# Modify entry
r = db_engine.execute(text('select * from parts where id=:id;'), id=partID)
l = []
for row in r:
l.append(dict(row))
r.close()
s = 'update parts '
s += 'set partno=:partno, description=:description, datasheet=:datasheet, location_id=:location_id '
if len(request.files) != 0:
datasheet_file = request.files['datasheet-file']
datasheet_filename = secure_filename(datasheet_file.filename)
i = 1
6 years ago
while os.path.isfile('srv/datasheets/' + datasheet_filename):
datasheet_filename = datasheet_filename[:-4] + str(i) + '.pdf'
i += 1
8 years ago
datasheet_file.save('/srv/datasheets/' + datasheet_filename)
datasheet_filename = 'http://elab.kth.se/parts/getfile/' + datasheet_filename
8 years ago
if l[0]['datasheet'] != None:
os.remove('/srv/datasheets/' + l[0]['datasheet'])
# elif request.form.has_key('datasheet-url'):
# datasheet_filename = request.form['datasheet-url']
else:
try:
datasheet_filename = request.form['datasheet-url']
except:
datasheet_filename = l[0]['datasheet']
s += 'where id=:id returning id;'
s = text(s)
r = db_engine.execute(s, partno=request.form['partno'],
description=request.form['description'],
datasheet=datasheet_filename,
location_id=request.form['location_id'],
id=partID)
new_id = r.fetchone()[0]
r.close()
return '{"status":"ok", "part_id" : ' + str(new_id) + '}'
@app.route('/parts/delete/<partID>')
# @requires_auth
def delete(partID):
6 years ago
if int(partID) < 0:
abort(400)
s = text('delete from parts where id=:id;')
r = db_engine.execute(s, id=partID)
return '{"status":"ok"}'
@app.route('/parts/deleteLocation/<locationID>')
# @requires_auth
def deleteLocation(locationID):
if int(locationID) < 0:
abort(400)
s = text('delete from locations where id=:id;')
r = db_engine.execute(s, id=locationID)
return '{"status":"ok"}'
def connect(user, password, db, host='localhost', port=5432):
'''Returns a connection and a metadata object'''
# We connect with the help of the PostgreSQL URL
url = 'postgresql://{}:{}@{}:{}/{}'
url = url.format(user, password, host, port, db)
# The return value of create_engine() is our connection object
con = sqlalchemy.create_engine(url, client_encoding='utf8')
# We then bind the connection to MetaData()
meta = sqlalchemy.MetaData(bind=con, reflect=True)
return con, meta
if __name__ == '__main__':
with open('admin.json') as f:
postgres_credentials = json.load(f)
db_engine, db_metadata = connect(postgres_credentials['username'], postgres_credentials['password'], 'parts_v2')
parts = sqlalchemy.Table('parts', db_metadata)
# Example query
'''s = select([parts]).where(parts.c.notes != '')
for row in db_engine.execute(s):
print row'''
app.run('0.0.0.0')