You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ELAB-partsearch/parts/server.py

291 lines
10 KiB

import os
import re
import json
import sqlalchemy
from functools import wraps
from sqlalchemy.sql import select
from sqlalchemy.sql import text
from flask import Flask
from flask import render_template, send_from_directory, request, Response, send_file
from PIL import Image, ImageDraw
from io import BytesIO
from os import listdir
from os.path import isfile, join
from werkzeug.utils import secure_filename
app = Flask(__name__)
db_engine = {}
db_metadata = {}
parts = {}
def getContainers():
query = "select id, name from containers order by UPPER(name);"
r = db_engine.execute(text(query))
containers = []
for row in r:
containers.append(dict(row))
r.close()
return containers
def check_auth(username, password):
admin_list = []
with open('edit_admin.json', 'r') as admin:
admin_list = json.load(admin)
for user in admin_list:
if username == user['username']:
return password == user['password']
def authenticate():
return Response('Could not verify access level. Please retry', 401, {'WWW-Authenticate' : 'Basic realm="Login Required"'})
def requires_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
auth = request.authorization
if not auth or not check_auth(auth.username, auth.password):
return authenticate()
return f(*args, **kwargs)
return decorated
def serveImage(img):
img_io = BytesIO()
img.convert("RGB").save(img_io, 'PNG')
img_io.seek(0)
return send_file(img_io, mimetype='image/png')
@app.route('/parts')
def index():
return render_template('partsearch.html', containers=getContainers())
@app.route('/parts/getlocationsInContainer/<containerID>')
def get_locations_in_container(containerID):
s = 'select id, name from locations where container_id = :id order by name;'
r = db_engine.execute(text(s), id=containerID)
l={}
for row in r:
l[row[0]]= row[1];
r.close()
return json.dumps(l)
@app.route('/parts/getlocationURL/<locationID>')
def get_locationURL(locationID):
s = 'select map from locations where id = :id;'
r = db_engine.execute(text(s), id=locationID)
l=[];
for row in r:
l.append(row);
r.close()
return l[0][0]
@app.route('/parts/locationEditor')
def locationEditor():
query = 'select c.name as container, l.name as name, l.id from locations as l inner join containers as c on l.container_id = c.id order by container, name;'
r = db_engine.execute(text(query))
locations = []
for row in r:
locations.append(dict(row))
r.close()
return render_template('locationEditor.html', locations=locations, containers=getContainers())
@app.route('/parts/alterLocation/<locationID>', methods=['POST'])
# @requires_auth
def alterLocation(locationID):
locationID = int(locationID)
s = ''
if locationID < 0:
# New entry
s = 'insert into locations (name, map) '
s += 'values (:name, :map);'
s = text(s)
r = db_engine.execute(s,name=request.form['name'],map=request.form['map']);
r.close()
return '{"status":"ok"}'
else:
# Modify entry
s = 'update locations '
s += 'set name=:name, map=:map, '
s += 'where id=:locationID;'
s = text(s)
r = db_engine.execute(s, name=request.form['name'],map=request.form['map'],locationID=locationID);
r.close()
return '{"status":"ok"}'
@app.route('/parts/getpartinfo/<partID>')
def get_part_info(partID):
s = 'select p.id,partno,description, c.name || l.name as location_descriptor, location_id, container_id, datasheet from parts as p inner join locations as l on p.location_id = l.id inner join containers as c on l.container_id = c.id where p.id = :id;'
r = db_engine.execute(text(s), id=partID)
l = []
for row in r:
l.append(dict(row))
r.close()
return json.dumps(l[0])
@app.route('/parts/query/<filter_dummy>/<query>') # TODO: maybe change AND to OR or maybe not
def query(filter_dummy, query):
filter = request.args.to_dict()
keywords = query.split() # Default splits with spaces
for i in range(len(keywords)):
keywords[i] = '%' + keywords[i] + '%'
kw_dict = {}
for i in range(len(keywords)):
kw_dict["kw" + str(i)] = keywords[i]
s = 'select p.id,partno,description, c.name || l.name as location_descriptor from parts as p inner join locations as l on p.location_id = l.id inner join containers as c on l.container_id = c.id where '
if filter['l']:
s += '('
for i in range(len(keywords)):
s += 'LOWER(l.name) like LOWER(:kw'+ str(i) +') AND '
s = s[:-5]
s += ') OR '
if filter['p']:
s += '('
for i in range(len(keywords)):
s += 'LOWER(partno) like LOWER(:kw'+ str(i) +') AND '
s = s[:-5]
s += ') OR '
if filter['d']:
s += '('
for i in range(len(keywords)):
s += 'LOWER(description) like LOWER(:kw'+ str(i) +') AND '
s = s[:-5]
s += ') OR '
if filter['n']:
s += '('
for i in range(len(keywords)):
s += 'LOWER(notes) like LOWER(:kw'+ str(i) +') AND '
s = s[:-5]
s += ') OR '
s = s[:-4] + ';'
s = text(s)
r = db_engine.execute(s, kw_dict)
l = []
for row in r:
l.append(dict(row))
r.close()
return json.dumps(l)
@app.route('/parts/map/<containerID>')
def getMap(containerID):
s = 'select map, overlay from containers where id = :id;'
r = db_engine.execute(text(s), id=containerID)
l = []
for row in r:
l.append(dict(row))
mapFile = l[0]['map']
overlayFile = l[0]['overlay']
try:
mapImage = Image.open('maps/' + mapFile).convert("RGBA")
overlayImage = Image.open('maps/overlays/' + overlayFile).convert("RGBA")
except FileNotFoundError:
return serveImage(Image.open("maps/404.png"))
# if request.args.get('x') is not None and request.args.get('y') is not None:
# x = int(request.args.get('x'))
# y = int(request.args.get('y'))
# pointer = Image.open('maps/here.png')
# pointerLayer = Image.new("RGBA", mapimage.size)
# width, height = pointer.size
# pointerLayer.paste(pointer, (x - int(width/2), y-int(height/2)))
mapimage = Image.alpha_composite(mapImage, overlayImage)
return serveImage(mapimage)
@app.route('/parts/getfile/<filename>')
def getfile(filename):
if(re.match('^[\w\-_]+$', filename) == None):
return 'No injections pls.'
return send_from_directory('/srv/datasheets/', filename + '.pdf')
@app.route('/parts/alter/<partID>', methods=['POST'])
# @requires_auth
def alter(partID):
partID = int(partID)
s = ''
r = {}
if partID < 0:
# New entry
s = 'insert into parts (partno, description, datasheet, location_id) '
s += 'values (:partno, :description, :datasheet, :location_id) returning id;'
s = text(s)
if len(request.files) != 0:
datasheet_file = request.files['datasheet-file']
datasheet_filename = secure_filename(datasheet_file.filename)
i = 1
while os.path.isfile('srv/datasheet/' + datasheet_filename):
datasheet_filename = datasheet_filename[:-4] + str(i) + '.pdf'
i += 1
datasheet_file.save('/srv/datasheets/' + datasheet_filename)
else:
datasheet_filename = None
r = db_engine.execute(s, partno=request.form['partno'],
description=request.form['description'],
datasheet=datasheet_filename,
location_id=request.form['location_id'])
else:
# Modify entry
r = db_engine.execute(text('select * from parts where id=:id;'), id=partID)
l = []
for row in r:
l.append(dict(row))
r.close()
s = 'update parts '
s += 'set partno=:partno, description=:description, datasheet=:datasheet, location_id=:location_id '
if len(request.files) != 0:
datasheet_file = request.files['datasheet-file']
datasheet_filename = secure_filename(datasheet_file.filename)
i = 1
while os.path.isfile('srv/datasheet/' + datasheet_filename):
datasheet_filename = datasheet_filename[:-4] + str(i) + '.pdf'
i += 1
datasheet_file.save('/srv/datasheets/' + datasheet_filename)
if l[0]['datasheet'] != None:
os.remove('/srv/datasheets/' + l[0]['datasheet'])
else:
datasheet_filename = l[0]['datasheet']
s += 'where id=:id returning id;'
s = text(s)
r = db_engine.execute(s, partno=request.form['partno'],
description=request.form['description'],
datasheet=datasheet_filename,
location_id=request.form['location_id'],
id=partID)
new_id = r.fetchone()[0]
r.close()
return '{"status":"ok", "part_id" : ' + str(new_id) + '}'
@app.route('/parts/delete/<partID>')
# @requires_auth
def delete(partID):
if int(partID) < 0:
abort(400)
s = text('delete from parts where id=:id;')
r = db_engine.execute(s, id=partID)
return '{"status":"ok"}'
def connect(user, password, db, host='localhost', port=5432):
'''Returns a connection and a metadata object'''
# We connect with the help of the PostgreSQL URL
url = 'postgresql://{}:{}@{}:{}/{}'
url = url.format(user, password, host, port, db)
# The return value of create_engine() is our connection object
con = sqlalchemy.create_engine(url, client_encoding='utf8')
# We then bind the connection to MetaData()
meta = sqlalchemy.MetaData(bind=con, reflect=True)
return con, meta
if __name__ == '__main__':
with open('admin.json') as f:
postgres_credentials = json.load(f)
db_engine, db_metadata = connect(postgres_credentials['username'], postgres_credentials['password'], 'parts_v2')
parts = sqlalchemy.Table('parts', db_metadata)
# Example query
'''s = select([parts]).where(parts.c.notes != '')
for row in db_engine.execute(s):
print row'''
app.run('0.0.0.0')