You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
271 lines
9.1 KiB
271 lines
9.1 KiB
import os
|
|
import re
|
|
import json
|
|
import sqlalchemy
|
|
from functools import wraps
|
|
from sqlalchemy.sql import select
|
|
from sqlalchemy.sql import text
|
|
from flask import Flask
|
|
from flask import render_template, send_from_directory, request, Response, send_file
|
|
from PIL import Image, ImageDraw
|
|
from io import BytesIO
|
|
from os import listdir
|
|
from os.path import isfile, join
|
|
from werkzeug.utils import secure_filename
|
|
|
|
app = Flask(__name__)
|
|
|
|
db_engine = {}
|
|
db_metadata = {}
|
|
parts = {}
|
|
|
|
def check_auth(username, password):
|
|
admin_list = []
|
|
with open('edit_admin.json', 'r') as admin:
|
|
admin_list = json.load(admin)
|
|
for user in admin_list:
|
|
if username == user['username']:
|
|
return password == user['password']
|
|
|
|
def authenticate():
|
|
return Response('Could not verify access level. Please retry', 401, {'WWW-Authenticate' : 'Basic realm="Login Required"'})
|
|
|
|
def requires_auth(f):
|
|
@wraps(f)
|
|
def decorated(*args, **kwargs):
|
|
auth = request.authorization
|
|
if not auth or not check_auth(auth.username, auth.password):
|
|
return authenticate()
|
|
return f(*args, **kwargs)
|
|
return decorated
|
|
|
|
def serveImage(img):
|
|
img_io = BytesIO()
|
|
img.convert("RGB").save(img_io, 'PNG')
|
|
img_io.seek(0)
|
|
return send_file(img_io, mimetype='image/png')
|
|
|
|
@app.route('/parts')
|
|
def index():
|
|
return render_template('partsearch.html')
|
|
|
|
@app.route('/parts/getlocations/')
|
|
def get_locations():
|
|
s = 'select id, name from locations order by name;'
|
|
r = db_engine.execute(text(s))
|
|
l = {}
|
|
for row in r:
|
|
l[row[0]]=row[1]
|
|
r.close()
|
|
return json.dumps(l)
|
|
|
|
@app.route('/parts/getlocationURL/<locationID>')
|
|
def get_locationURL(locationID):
|
|
s = 'select map from locations where id = :id;'
|
|
r = db_engine.execute(text(s), id=locationID)
|
|
l=[];
|
|
for row in r:
|
|
l.append(row);
|
|
r.close()
|
|
return l[0][0]
|
|
|
|
@app.route('/parts/locationEditor')
|
|
def locationEditor():
|
|
query = 'select * from locations order by name;'
|
|
r = db_engine.execute(text(query))
|
|
locations = []
|
|
for row in r:
|
|
locations.append(dict(row))
|
|
r.close()
|
|
know_not_map_files = ['here.png', '404.png', '.DS_Store']
|
|
mapfiles = [f for f in listdir('maps') if isfile(join('maps', f)) and f not in know_not_map_files]
|
|
return render_template('locationEditor.html', locations=locations, mapfiles=mapfiles, defaultMapfile="elab.png")
|
|
|
|
@app.route('/parts/alterLocation/<locationID>', methods=['POST'])
|
|
# @requires_auth
|
|
def alterLocation(locationID):
|
|
locationID = int(locationID)
|
|
s = ''
|
|
if locationID < 0:
|
|
# New entry
|
|
s = 'insert into locations (name, map) '
|
|
s += 'values (:name, :map);'
|
|
s = text(s)
|
|
r = db_engine.execute(s,name=request.form['name'],map=request.form['map']);
|
|
r.close()
|
|
return '{"status":"ok"}'
|
|
else:
|
|
# Modify entry
|
|
s = 'update locations '
|
|
s += 'set name=:name, map=:map, '
|
|
s += 'where id=:locationID;'
|
|
s = text(s)
|
|
r = db_engine.execute(s, name=request.form['name'],map=request.form['map'],locationID=locationID);
|
|
r.close()
|
|
return '{"status":"ok"}'
|
|
|
|
@app.route('/parts/getpartinfo/<partID>')
|
|
def get_part_info(partID):
|
|
s = 'select * from parts as p inner join locations as l on p.location_id=l.id where p.id = :id;'
|
|
r = db_engine.execute(text(s), id=partID)
|
|
l = []
|
|
for row in r:
|
|
l.append(dict(row))
|
|
r.close()
|
|
return json.dumps(l[0])
|
|
|
|
@app.route('/parts/query/<filter_dummy>/<query>') # TODO: maybe change AND to OR or maybe not
|
|
def query(filter_dummy, query):
|
|
filter = request.args.to_dict()
|
|
keywords = query.split() # Default splits with spaces
|
|
for i in range(len(keywords)):
|
|
keywords[i] = '%' + keywords[i] + '%'
|
|
kw_dict = {}
|
|
for i in range(len(keywords)):
|
|
kw_dict["kw" + str(i)] = keywords[i]
|
|
|
|
s = 'select p.id,partno,description,l.name as location_descriptor from parts as p inner join locations as l on p.location_id = l.id where '
|
|
if filter['l']:
|
|
s += '('
|
|
for i in range(len(keywords)):
|
|
s += 'LOWER(l.name) like LOWER(:kw'+ str(i) +') AND '
|
|
s = s[:-5]
|
|
s += ') OR '
|
|
if filter['p']:
|
|
s += '('
|
|
for i in range(len(keywords)):
|
|
s += 'LOWER(partno) like LOWER(:kw'+ str(i) +') AND '
|
|
s = s[:-5]
|
|
s += ') OR '
|
|
if filter['d']:
|
|
s += '('
|
|
for i in range(len(keywords)):
|
|
s += 'LOWER(description) like LOWER(:kw'+ str(i) +') AND '
|
|
s = s[:-5]
|
|
s += ') OR '
|
|
if filter['n']:
|
|
s += '('
|
|
for i in range(len(keywords)):
|
|
s += 'LOWER(notes) like LOWER(:kw'+ str(i) +') AND '
|
|
s = s[:-5]
|
|
s += ') OR '
|
|
s = s[:-4] + ';'
|
|
s = text(s)
|
|
r = db_engine.execute(s, kw_dict)
|
|
l = []
|
|
for row in r:
|
|
l.append(dict(row))
|
|
r.close()
|
|
return json.dumps(l)
|
|
|
|
@app.route('/parts/map/<mapurl>')
|
|
def getMap(mapurl):
|
|
try:
|
|
mapimage = Image.open('maps/' + mapurl).convert("RGBA")
|
|
except FileNotFoundError:
|
|
return serveImage(Image.open("maps/404.png"))
|
|
if request.args.get('x') is not None and request.args.get('y') is not None:
|
|
x = int(request.args.get('x'))
|
|
y = int(request.args.get('y'))
|
|
pointer = Image.open('maps/here.png')
|
|
pointerLayer = Image.new("RGBA", mapimage.size)
|
|
width, height = pointer.size
|
|
pointerLayer.paste(pointer, (x - int(width/2), y-int(height/2)))
|
|
mapimage = Image.alpha_composite(mapimage, pointerLayer)
|
|
return serveImage(mapimage)
|
|
|
|
|
|
@app.route('/parts/getfile/<filename>')
|
|
def getfile(filename):
|
|
if(re.match('^[\w\-_]+$', filename) == None):
|
|
return 'No injections pls.'
|
|
|
|
return send_from_directory('/srv/datasheets/', filename + '.pdf')
|
|
|
|
@app.route('/parts/alter/<partID>', methods=['POST'])
|
|
# @requires_auth
|
|
def alter(partID):
|
|
partID = int(partID)
|
|
s = ''
|
|
if partID < 0:
|
|
# New entry
|
|
s = 'insert into parts (partno, description, datasheet, location_id) '
|
|
s += 'values (:partno, :description, :datasheet, :location_id);'
|
|
s = text(s)
|
|
if len(request.files) != 0:
|
|
datasheet_file = request.files['datasheet-file']
|
|
datasheet_filename = secure_filename(datasheet_file.filename)
|
|
i = 1
|
|
while os.path.isfile('srv/datasheet/' + datasheet_filename):
|
|
datasheet_filename = datasheet_filename[:-4] + str(i) + '.pdf'
|
|
i += 1
|
|
datasheet_file.save('/srv/datasheets/' + datasheet_filename)
|
|
else:
|
|
datasheet_filename = None
|
|
r = db_engine.execute(s, partno=request.form['partno'],
|
|
description=request.form['description'],
|
|
datasheet=datasheet_filename,
|
|
location_id=request.form['location_id'])
|
|
else:
|
|
# Modify entry
|
|
r = db_engine.execute(text('select * from parts where id=:id;'), id=partID)
|
|
l = []
|
|
for row in r:
|
|
l.append(dict(row))
|
|
r.close()
|
|
s = 'update parts '
|
|
s += 'set partno=:partno, description=:description, datasheet=:datasheet, location_id=:location_id '
|
|
if len(request.files) != 0:
|
|
datasheet_file = request.files['datasheet-file']
|
|
datasheet_filename = secure_filename(datasheet_file.filename)
|
|
i = 1
|
|
while os.path.isfile('srv/datasheet/' + datasheet_filename):
|
|
datasheet_filename = datasheet_filename[:-4] + str(i) + '.pdf'
|
|
i += 1
|
|
datasheet_file.save('/srv/datasheets/' + datasheet_filename)
|
|
if l[0]['datasheet'] != None:
|
|
os.remove('/srv/datasheets/' + l[0]['datasheet'])
|
|
else:
|
|
datasheet_filename = l[0]['datasheet']
|
|
s += 'where id=:id;'
|
|
s = text(s)
|
|
r = db_engine.execute(s, partno=request.form['partno'],
|
|
description=request.form['description'],
|
|
datasheet=datasheet_filename,
|
|
location_id=request.form['location_id'])
|
|
return '{"status":"ok"}'
|
|
|
|
@app.route('/parts/delete/<partID>')
|
|
@requires_auth
|
|
def delete(partID):
|
|
if partID < 0:
|
|
abort(400)
|
|
s = text('delete from parts where id=:id;')
|
|
r = db_engine.execute(s, id=partID)
|
|
return '{"status":"ok"}'
|
|
|
|
def connect(user, password, db, host='localhost', port=5432):
|
|
'''Returns a connection and a metadata object'''
|
|
# We connect with the help of the PostgreSQL URL
|
|
url = 'postgresql://{}:{}@{}:{}/{}'
|
|
url = url.format(user, password, host, port, db)
|
|
|
|
# The return value of create_engine() is our connection object
|
|
con = sqlalchemy.create_engine(url, client_encoding='utf8')
|
|
|
|
# We then bind the connection to MetaData()
|
|
meta = sqlalchemy.MetaData(bind=con, reflect=True)
|
|
|
|
return con, meta
|
|
|
|
if __name__ == '__main__':
|
|
with open('admin.json') as f:
|
|
postgres_credentials = json.load(f)
|
|
db_engine, db_metadata = connect(postgres_credentials['username'], postgres_credentials['password'], 'parts_v2')
|
|
parts = sqlalchemy.Table('parts', db_metadata)
|
|
# Example query
|
|
'''s = select([parts]).where(parts.c.notes != '')
|
|
for row in db_engine.execute(s):
|
|
print row'''
|
|
app.run('0.0.0.0')
|