You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ELAB-partsearch/parts/server.py

205 lines
7.3 KiB

import os
import re
import json
import sqlalchemy
from functools import wraps
from sqlalchemy.sql import select
from sqlalchemy.sql import text
from flask import Flask
from flask import render_template, send_from_directory, request, Response
from werkzeug.utils import secure_filename
import pprint
pp = pprint.PrettyPrinter(indent=4)
app = Flask(__name__)
db_engine = {}
db_metadata = {}
parts = {}
def check_auth(username, password):
admin_list = []
with open('edit_admin.json', 'r') as admin:
admin_list = json.load(admin)
for user in admin_list:
if username == user['username']:
return password == user['password']
def authenticate():
return Response('Could not verify access level. Please retry', 401, {'WWW-Authenticate' : 'Basic realm="Login Required"'})
def requires_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
auth = request.authorization
if not auth or not check_auth(auth.username, auth.password):
return authenticate()
return f(*args, **kwargs)
return decorated
@app.route('/parts')
def index():
return render_template('partsearch.html')
@app.route('/parts/getpartinfo/<partID>')
def get_part_info(partID):
s = 'select * from parts as p inner join locations as l on p.location_id=l.id where p.id = :id;'
r = db_engine.execute(text(s), id=partID)
l = []
for row in r:
l.append(dict(row))
r.close()
return json.dumps(l[0])
@app.route('/parts/query/<filter_dummy>/<query>') # TODO: maybe change AND to OR or maybe not
def query(filter_dummy, query):
filter = request.args.to_dict()
keywords = query.split() # Default splits with spaces
for i in range(len(keywords)):
keywords[i] = '%' + keywords[i] + '%'
kw_dict = {}
for i in range(len(keywords)):
kw_dict["kw" + str(i)] = keywords[i]
s = 'select p.id,partno,description,l.name as location_descriptor from parts as p inner join locations as l on p.location_id = l.id where '
if filter['l']:
s += '('
for i in range(len(keywords)):
s += 'LOWER(l.name) like LOWER(:kw'+ str(i) +') AND '
s = s[:-5]
s += ') OR '
if filter['p']:
s += '('
for i in range(len(keywords)):
s += 'LOWER(partno) like LOWER(:kw'+ str(i) +') AND '
s = s[:-5]
s += ') OR '
if filter['d']:
s += '('
for i in range(len(keywords)):
s += 'LOWER(description) like LOWER(:kw'+ str(i) +') AND '
s = s[:-5]
s += ') OR '
if filter['n']:
s += '('
for i in range(len(keywords)):
s += 'LOWER(notes) like LOWER(:kw'+ str(i) +') AND '
s = s[:-5]
s += ') OR '
s = s[:-4] + ';'
s = text(s)
r = db_engine.execute(s, kw_dict)
l = []
for row in r:
l.append(dict(row))
r.close()
return json.dumps(l)
@app.route('/parts/getfile/<filename>')
def getfile(filename):
if(re.match('^[\w\-_]+$', filename) == None):
return 'No injections pls.'
return send_from_directory('/srv/datasheets/', filename + '.pdf')
@app.route('/parts/alter/<partID>', methods=['POST'])
@requires_auth
def alter(partID):
partID = int(partID)
s = ''
if partID < 0:
# New entry
s = 'insert into parts (block, type, partno, partnoalt, partnoalt2, description, quantity, datasheet, notes) '
s += 'values (:block, :type, :partno, :partnoalt, :partnoalt2, :description, :quantity, :datasheet, :notes);'
s = text(s)
if len(request.files) != 0:
datasheet_file = request.files['datasheet-file']
datasheet_filename = secure_filename(datasheet_file.filename)
i = 1
while os.path.isfile('srv/datasheet/' + datasheet_filename):
datasheet_filename = datasheet_filename[:-4] + str(i) + '.pdf'
i += 1
datasheet_file.save('/srv/datasheets/' + datasheet_filename)
else:
datasheet_filename = None
r = db_engine.execute(s, block=request.form['block'],
type=request.form['type'],
partno=request.form['partno'],
partnoalt=request.form['partnoalt'],
partnoalt2=request.form['partnoalt2'],
description=request.form['description'],
quantity=request.form['quantity'],
datasheet=datasheet_filename,
notes=request.form['notes'])
else:
# Modify entry
r = db_engine.execute(text('select * from parts where id=:id;'), id=partID)
l = []
for row in r:
l.append(dict(row))
r.close()
s = 'update parts '
s += 'set block=:block, type=:type, partno=:partno, partnoalt=:partnoalt, partnoalt2=:partnoalt2, description=:description, quantity=:quantity, datasheet=:datasheet, notes=:notes '
if len(request.files) != 0:
datasheet_file = request.files['datasheet-file']
datasheet_filename = secure_filename(datasheet_file.filename)
i = 1
while os.path.isfile('srv/datasheet/' + datasheet_filename):
datasheet_filename = datasheet_filename[:-4] + str(i) + '.pdf'
i += 1
datasheet_file.save('/srv/datasheets/' + datasheet_filename)
if l[0]['datasheet'] != None:
os.remove('/srv/datasheets/' + l[0]['datasheet'])
else:
datasheet_filename = l[0]['datasheet']
s += 'where id=:id;'
s = text(s)
r = db_engine.execute(s, location_id=request.form['block'],
type=request.form['type'],
partno=request.form['partno'],
partnoalt=request.form['partnoalt'],
partnoalt2=request.form['partnoalt2'],
description=request.form['description'],
quantity=request.form['quantity'],
datasheet=datasheet_filename,
notes=request.form['notes'],
id=partID)
return '{"status":"ok"}'
@app.route('/parts/delete/<partID>')
@requires_auth
def delete(partID):
if partID < 0:
abort(400)
s = text('delete from parts where id=:id;')
r = db_engine.execute(s, id=partID)
return '{"status":"ok"}'
def connect(user, password, db, host='localhost', port=5432):
'''Returns a connection and a metadata object'''
# We connect with the help of the PostgreSQL URL
url = 'postgresql://{}:{}@{}:{}/{}'
url = url.format(user, password, host, port, db)
# The return value of create_engine() is our connection object
con = sqlalchemy.create_engine(url, client_encoding='utf8')
# We then bind the connection to MetaData()
meta = sqlalchemy.MetaData(bind=con, reflect=True)
return con, meta
if __name__ == '__main__':
with open('admin.json') as f:
postgres_credentials = json.load(f)
db_engine, db_metadata = connect(postgres_credentials['username'], postgres_credentials['password'], 'parts_v2')
parts = sqlalchemy.Table('parts', db_metadata)
# Example query
'''s = select([parts]).where(parts.c.notes != '')
for row in db_engine.execute(s):
print row'''
app.run('0.0.0.0')