You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ELAB-partsearch/parts/server.py

272 lines
9.3 KiB

import os
import re
import json
import sqlalchemy
from functools import wraps
from sqlalchemy.sql import select
from sqlalchemy.sql import text
from flask import Flask
from flask import render_template, send_from_directory, request, Response, send_file
from PIL import Image, ImageDraw
from io import BytesIO
from os import listdir
from os.path import isfile, join
from werkzeug.utils import secure_filename
app = Flask(__name__)
db_engine = {}
db_metadata = {}
parts = {}
def check_auth(username, password):
admin_list = []
with open('edit_admin.json', 'r') as admin:
admin_list = json.load(admin)
for user in admin_list:
if username == user['username']:
return password == user['password']
def authenticate():
return Response('Could not verify access level. Please retry', 401, {'WWW-Authenticate' : 'Basic realm="Login Required"'})
def requires_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
auth = request.authorization
if not auth or not check_auth(auth.username, auth.password):
return authenticate()
return f(*args, **kwargs)
return decorated
def serveImage(img):
img_io = BytesIO()
img.convert("RGB").save(img_io, 'PNG')
img_io.seek(0)
return send_file(img_io, mimetype='image/png')
@app.route('/parts')
def index():
query = "select id, name from locations order by left(map, strpos(map, '.')-1), UPPER(name);"
r = db_engine.execute(text(query))
locations = []
for row in r:
locations.append(dict(row))
r.close()
return render_template('partsearch.html', locations=locations)
@app.route('/parts/getlocationURL/<locationID>')
def get_locationURL(locationID):
s = 'select map from locations where id = :id;'
r = db_engine.execute(text(s), id=locationID)
l=[];
for row in r:
l.append(row);
r.close()
return l[0][0]
@app.route('/parts/locationEditor')
def locationEditor():
query = 'select * from locations order by name;'
r = db_engine.execute(text(query))
locations = []
for row in r:
locations.append(dict(row))
r.close()
know_not_map_files = ['here.png', '404.png', '.DS_Store']
mapfiles = [f for f in listdir('maps') if isfile(join('maps', f)) and f not in know_not_map_files]
return render_template('locationEditor.html', locations=locations, mapfiles=mapfiles, defaultMapfile="elab.png")
@app.route('/parts/alterLocation/<locationID>', methods=['POST'])
# @requires_auth
def alterLocation(locationID):
locationID = int(locationID)
s = ''
if locationID < 0:
# New entry
s = 'insert into locations (name, map) '
s += 'values (:name, :map);'
s = text(s)
r = db_engine.execute(s,name=request.form['name'],map=request.form['map']);
r.close()
return '{"status":"ok"}'
else:
# Modify entry
s = 'update locations '
s += 'set name=:name, map=:map, '
s += 'where id=:locationID;'
s = text(s)
r = db_engine.execute(s, name=request.form['name'],map=request.form['map'],locationID=locationID);
r.close()
return '{"status":"ok"}'
@app.route('/parts/getpartinfo/<partID>')
def get_part_info(partID):
s = 'select * from parts as p inner join locations as l on p.location_id=l.id where p.id = :id;'
r = db_engine.execute(text(s), id=partID)
l = []
for row in r:
l.append(dict(row))
r.close()
return json.dumps(l[0])
@app.route('/parts/query/<filter_dummy>/<query>') # TODO: maybe change AND to OR or maybe not
def query(filter_dummy, query):
filter = request.args.to_dict()
keywords = query.split() # Default splits with spaces
for i in range(len(keywords)):
keywords[i] = '%' + keywords[i] + '%'
kw_dict = {}
for i in range(len(keywords)):
kw_dict["kw" + str(i)] = keywords[i]
s = 'select p.id,partno,description,l.name as location_descriptor from parts as p inner join locations as l on p.location_id = l.id where '
if filter['l']:
s += '('
for i in range(len(keywords)):
s += 'LOWER(l.name) like LOWER(:kw'+ str(i) +') AND '
s = s[:-5]
s += ') OR '
if filter['p']:
s += '('
for i in range(len(keywords)):
s += 'LOWER(partno) like LOWER(:kw'+ str(i) +') AND '
s = s[:-5]
s += ') OR '
if filter['d']:
s += '('
for i in range(len(keywords)):
s += 'LOWER(description) like LOWER(:kw'+ str(i) +') AND '
s = s[:-5]
s += ') OR '
if filter['n']:
s += '('
for i in range(len(keywords)):
s += 'LOWER(notes) like LOWER(:kw'+ str(i) +') AND '
s = s[:-5]
s += ') OR '
s = s[:-4] + ';'
s = text(s)
r = db_engine.execute(s, kw_dict)
l = []
for row in r:
l.append(dict(row))
r.close()
return json.dumps(l)
@app.route('/parts/map/<mapurl>')
def getMap(mapurl):
try:
mapimage = Image.open('maps/' + mapurl).convert("RGBA")
except FileNotFoundError:
return serveImage(Image.open("maps/404.png"))
if request.args.get('x') is not None and request.args.get('y') is not None:
x = int(request.args.get('x'))
y = int(request.args.get('y'))
pointer = Image.open('maps/here.png')
pointerLayer = Image.new("RGBA", mapimage.size)
width, height = pointer.size
pointerLayer.paste(pointer, (x - int(width/2), y-int(height/2)))
mapimage = Image.alpha_composite(mapimage, pointerLayer)
return serveImage(mapimage)
@app.route('/parts/getfile/<filename>')
def getfile(filename):
if(re.match('^[\w\-_]+$', filename) == None):
return 'No injections pls.'
return send_from_directory('/srv/datasheets/', filename + '.pdf')
@app.route('/parts/alter/<partID>', methods=['POST'])
# @requires_auth
def alter(partID):
partID = int(partID)
s = ''
r = {}
if partID < 0:
# New entry
s = 'insert into parts (partno, description, datasheet, location_id) '
s += 'values (:partno, :description, :datasheet, :location_id) returning id;'
s = text(s)
if len(request.files) != 0:
datasheet_file = request.files['datasheet-file']
datasheet_filename = secure_filename(datasheet_file.filename)
i = 1
while os.path.isfile('srv/datasheet/' + datasheet_filename):
datasheet_filename = datasheet_filename[:-4] + str(i) + '.pdf'
i += 1
datasheet_file.save('/srv/datasheets/' + datasheet_filename)
else:
datasheet_filename = None
r = db_engine.execute(s, partno=request.form['partno'],
description=request.form['description'],
datasheet=datasheet_filename,
location_id=request.form['location_id'])
else:
# Modify entry
r = db_engine.execute(text('select * from parts where id=:id;'), id=partID)
l = []
for row in r:
l.append(dict(row))
r.close()
s = 'update parts '
s += 'set partno=:partno, description=:description, datasheet=:datasheet, location_id=:location_id '
if len(request.files) != 0:
datasheet_file = request.files['datasheet-file']
datasheet_filename = secure_filename(datasheet_file.filename)
i = 1
while os.path.isfile('srv/datasheet/' + datasheet_filename):
datasheet_filename = datasheet_filename[:-4] + str(i) + '.pdf'
i += 1
datasheet_file.save('/srv/datasheets/' + datasheet_filename)
if l[0]['datasheet'] != None:
os.remove('/srv/datasheets/' + l[0]['datasheet'])
else:
datasheet_filename = l[0]['datasheet']
s += 'where id=:id returning id;'
s = text(s)
r = db_engine.execute(s, partno=request.form['partno'],
description=request.form['description'],
datasheet=datasheet_filename,
location_id=request.form['location_id'],
id=partID)
new_id = r.fetchone()[0]
r.close()
return '{"status":"ok", "part_id" : ' + str(new_id) + '}'
@app.route('/parts/delete/<partID>')
# @requires_auth
def delete(partID):
if int(partID) < 0:
abort(400)
s = text('delete from parts where id=:id;')
r = db_engine.execute(s, id=partID)
return '{"status":"ok"}'
def connect(user, password, db, host='localhost', port=5432):
'''Returns a connection and a metadata object'''
# We connect with the help of the PostgreSQL URL
url = 'postgresql://{}:{}@{}:{}/{}'
url = url.format(user, password, host, port, db)
# The return value of create_engine() is our connection object
con = sqlalchemy.create_engine(url, client_encoding='utf8')
# We then bind the connection to MetaData()
meta = sqlalchemy.MetaData(bind=con, reflect=True)
return con, meta
if __name__ == '__main__':
with open('admin.json') as f:
postgres_credentials = json.load(f)
db_engine, db_metadata = connect(postgres_credentials['username'], postgres_credentials['password'], 'parts_v2')
parts = sqlalchemy.Table('parts', db_metadata)
# Example query
'''s = select([parts]).where(parts.c.notes != '')
for row in db_engine.execute(s):
print row'''
app.run('0.0.0.0')