import os import re import json import sqlalchemy from functools import wraps from sqlalchemy.sql import select from sqlalchemy.sql import text from flask import Flask from flask import render_template, send_from_directory, request, Response, send_file from PIL import Image, ImageDraw from io import BytesIO from os import listdir from os.path import isfile, join from werkzeug.utils import secure_filename app = Flask(__name__) db_engine = {} db_metadata = {} parts = {} def getContainers(): query = "select id, name from containers order by UPPER(name);" r = db_engine.execute(text(query)) containers = [] for row in r: containers.append(dict(row)) r.close() return containers def check_auth(username, password): admin_list = [] with open('edit_admin.json', 'r') as admin: admin_list = json.load(admin) for user in admin_list: if username == user['username']: return password == user['password'] def authenticate(): return Response('Could not verify access level. Please retry', 401, {'WWW-Authenticate' : 'Basic realm="Login Required"'}) def requires_auth(f): @wraps(f) def decorated(*args, **kwargs): auth = request.authorization if not auth or not check_auth(auth.username, auth.password): return authenticate() return f(*args, **kwargs) return decorated def serveImage(img): img_io = BytesIO() img.convert("RGB").save(img_io, 'PNG') img_io.seek(0) return send_file(img_io, mimetype='image/png') @app.route('/parts', strict_slashes=False) def index(): return render_template('partsearch.html', containers=getContainers()) @app.route('/parts/getlocationsInContainer/') def get_locations_in_container(containerID): s = 'select id, name from locations where container_id = :id order by name;' r = db_engine.execute(text(s), id=containerID) l={} for row in r: l[row[0]]= row[1]; r.close() return json.dumps(l) @app.route('/parts/getlocationURL/') def get_locationURL(locationID): s = 'select map from locations where id = :id;' r = db_engine.execute(text(s), id=locationID) l=[]; for row in r: l.append(row); r.close() return l[0][0] @app.route('/parts/locationEditor') def locationEditor(): query = 'select c.name as container, l.name as name, l.id, c.id as container_id from locations as l inner join containers as c on l.container_id = c.id order by container, name;' r = db_engine.execute(text(query)) locations = [] for row in r: locations.append(dict(row)) r.close() return render_template('locationEditor.html', locations=locations, containers=getContainers()) @app.route('/parts/alterLocation/', methods=['POST']) # @requires_auth def alterLocation(locationID): locationID = int(locationID) s = '' if locationID < 0: # New entry s = 'insert into locations (name, container_id) ' s += 'values (:name, :container);' s = text(s) r = db_engine.execute(s,name=request.form['name'],container=request.form['container']); r.close() return '{"status":"ok"}' else: # Modify entry s = 'update locations ' s += 'set name=:name, container_id=:container ' s += 'where id=:locationID;' s = text(s) r = db_engine.execute(s, name=request.form['name'],container=request.form['container'],locationID=locationID); r.close() return '{"status":"ok"}' @app.route('/parts/getpartinfo/') def get_part_info(partID): s = 'select p.id,partno,description, c.name || l.name as location_descriptor, location_id, container_id, datasheet from parts as p inner join locations as l on p.location_id = l.id inner join containers as c on l.container_id = c.id where p.id = :id;' r = db_engine.execute(text(s), id=partID) l = [] for row in r: l.append(dict(row)) r.close() return json.dumps(l[0]) @app.route('/parts/query//') # TODO: maybe change AND to OR or maybe not def query(filter_dummy, query): filter = request.args.to_dict() keywords = query.split() # Default splits with spaces for i in range(len(keywords)): keywords[i] = '%' + keywords[i] + '%' kw_dict = {} for i in range(len(keywords)): kw_dict["kw" + str(i)] = keywords[i] s = 'select p.id,partno,description, c.name || l.name as location_descriptor from parts as p inner join locations as l on p.location_id = l.id inner join containers as c on l.container_id = c.id where ' if filter['l'] == 'true': s += '(' for i in range(len(keywords)): s += 'LOWER(l.name) like LOWER(:kw'+ str(i) +') AND ' s = s[:-5] s += ') OR ' if filter['p'] == 'true': s += '(' for i in range(len(keywords)): s += 'LOWER(partno) like LOWER(:kw'+ str(i) +') AND ' s = s[:-5] s += ') OR ' if filter['d'] == 'true': s += '(' for i in range(len(keywords)): s += 'LOWER(description) like LOWER(:kw'+ str(i) +') AND ' s = s[:-5] s += ') OR ' if filter['n'] == 'true': s = s[:-4] s += "AND (datasheet='' IS NOT FALSE) IS FALSE OR " s = s[:-4] + ';' print(s) s = text(s) r = db_engine.execute(s, kw_dict) l = [] for row in r: l.append(dict(row)) r.close() return json.dumps(l) @app.route('/parts/map/') def getMap(containerID): s = 'select map, overlay from containers where id = :id;' r = db_engine.execute(text(s), id=containerID) l = [] for row in r: l.append(dict(row)) mapFile = l[0]['map'] overlayFile = l[0]['overlay'] try: mapImage = Image.open('maps/' + mapFile).convert("RGBA") overlayImage = Image.open('maps/overlays/' + overlayFile).convert("RGBA") except FileNotFoundError: return serveImage(Image.open("maps/404.png")) # if request.args.get('x') is not None and request.args.get('y') is not None: # x = int(request.args.get('x')) # y = int(request.args.get('y')) # pointer = Image.open('maps/here.png') # pointerLayer = Image.new("RGBA", mapimage.size) # width, height = pointer.size # pointerLayer.paste(pointer, (x - int(width/2), y-int(height/2))) mapimage = Image.alpha_composite(mapImage, overlayImage) return serveImage(mapimage) @app.route('/parts/getfile/') def getfile(filename): if(re.match('^[\w\-_]+$', filename) == None): return 'No injections pls.' return send_from_directory('/srv/datasheets/', filename + '.pdf') @app.route('/parts/alter/', methods=['POST']) # @requires_auth def alter(partID): partID = int(partID) s = '' r = {} if partID < 0: # New entry s = 'insert into parts (partno, description, datasheet, location_id) ' s += 'values (:partno, :description, :datasheet, :location_id) returning id;' s = text(s) if len(request.files) != 0: datasheet_file = request.files['datasheet-file'] datasheet_filename = secure_filename(datasheet_file.filename) i = 1 while os.path.isfile('srv/datasheet/' + datasheet_filename): datasheet_filename = datasheet_filename[:-4] + str(i) + '.pdf' i += 1 datasheet_file.save('/srv/datasheets/' + datasheet_filename) else: datasheet_filename = None r = db_engine.execute(s, partno=request.form['partno'], description=request.form['description'], datasheet=datasheet_filename, location_id=request.form['location_id']) else: # Modify entry r = db_engine.execute(text('select * from parts where id=:id;'), id=partID) l = [] for row in r: l.append(dict(row)) r.close() s = 'update parts ' s += 'set partno=:partno, description=:description, datasheet=:datasheet, location_id=:location_id ' if len(request.files) != 0: datasheet_file = request.files['datasheet-file'] datasheet_filename = secure_filename(datasheet_file.filename) i = 1 while os.path.isfile('srv/datasheet/' + datasheet_filename): datasheet_filename = datasheet_filename[:-4] + str(i) + '.pdf' i += 1 datasheet_file.save('/srv/datasheets/' + datasheet_filename) if l[0]['datasheet'] != None: os.remove('/srv/datasheets/' + l[0]['datasheet']) else: datasheet_filename = l[0]['datasheet'] s += 'where id=:id returning id;' s = text(s) r = db_engine.execute(s, partno=request.form['partno'], description=request.form['description'], datasheet=datasheet_filename, location_id=request.form['location_id'], id=partID) new_id = r.fetchone()[0] r.close() return '{"status":"ok", "part_id" : ' + str(new_id) + '}' @app.route('/parts/delete/') # @requires_auth def delete(partID): if int(partID) < 0: abort(400) s = text('delete from parts where id=:id;') r = db_engine.execute(s, id=partID) return '{"status":"ok"}' def connect(user, password, db, host='localhost', port=5432): '''Returns a connection and a metadata object''' # We connect with the help of the PostgreSQL URL url = 'postgresql://{}:{}@{}:{}/{}' url = url.format(user, password, host, port, db) # The return value of create_engine() is our connection object con = sqlalchemy.create_engine(url, client_encoding='utf8') # We then bind the connection to MetaData() meta = sqlalchemy.MetaData(bind=con, reflect=True) return con, meta if __name__ == '__main__': with open('admin.json') as f: postgres_credentials = json.load(f) db_engine, db_metadata = connect(postgres_credentials['username'], postgres_credentials['password'], 'parts_v2') parts = sqlalchemy.Table('parts', db_metadata) # Example query '''s = select([parts]).where(parts.c.notes != '') for row in db_engine.execute(s): print row''' app.run('0.0.0.0')