You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ELAB-partsearch/parts/server.py

410 lines
15 KiB

import os
import re
import json
import requests
import sqlalchemy
from functools import wraps
from sqlalchemy.sql import select
from sqlalchemy.sql import text
from flask import Flask
from flask import render_template, send_from_directory, request, Response, send_file, session
from PIL import Image, ImageDraw
from io import BytesIO
from os import listdir
from os.path import isfile, join
from werkzeug.utils import secure_filename
import random
app = Flask(__name__)
db_engine = {}
db_metadata = {}
parts = {}
octopartURL =""
baseURL = "/parts"
taglines = []
with open('taglines.txt', 'r') as infile:
for line in infile:
taglines.append(line[:-1])
def getContainers():
query = "select id, name from containers order by UPPER(name);"
r = db_engine.execute(text(query))
containers = []
for row in r:
containers.append(dict(row))
r.close()
return containers
def check_auth(username, password):
query = "select id, password from users where username=:usrnm;"
r = db_engine.execute(text(query), usrnm=username)
results = []
for row in r:
results.append(dict(row))
r.close()
if len(results)!=1:
return False;
if results[0]['password']==password:
session['uid'] = results[0]['id']
print (session['uid'])
return True
else:
return False
def authenticate():
return Response('Could not verify access level. Please retry', 401, {'WWW-Authenticate' : 'Basic realm="Login Required"'})
def requires_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
auth = request.authorization
if not auth or not check_auth(auth.username, auth.password):
return authenticate()
return f(*args, **kwargs)
return decorated
def serveImage(img):
img_io = BytesIO()
img.convert("RGB").save(img_io, 'PNG')
img_io.seek(0)
return send_file(img_io, mimetype='image/png')
@app.route(baseURL, strict_slashes=True)
def index():
return render_template('partsearch.html', containers=getContainers(), baseURL=baseURL, tagline=random.choice(taglines))
@app.route(baseURL+'/getlocationsInContainer/<containerID>')
def get_locations_in_container(containerID):
s = 'select id, name from locations where container_id = :id order by name;'
r = db_engine.execute(text(s), id=containerID)
l={}
for row in r:
l[str(row[0])]= row[1];
r.close()
return json.dumps(l)
@app.route(baseURL+'/getlocationURL/<locationID>')
def get_locationURL(locationID):
s = 'select map from locations where id = :id;'
r = db_engine.execute(text(s), id=locationID)
l=[];
for row in r:
l.append(row);
r.close()
return l[0][0]
@app.route(baseURL+'/locationEditor')
def locationEditor():
query = 'select c.name as container, l.name as name, l.id, c.id as container_id from locations as l inner join containers as c on l.container_id = c.id order by container, name;'
r = db_engine.execute(text(query))
locations = []
for row in r:
locations.append(dict(row))
r.close()
return render_template('locationEditor.html', locations=locations, containers=getContainers(),baseURL=baseURL)
@app.route(baseURL+'/userEditor')
def userEditor():
query = 'select c.name as container, l.name as name, l.id, c.id as container_id from locations as l inner join containers as c on l.container_id = c.id order by container, name;'
query = 'select id, username from users;'
r = db_engine.execute(text(query))
users = []
for row in r:
users.append(dict(row))
r.close()
return render_template('userEditor.html', users=users, containers=getContainers(),baseURL=baseURL)
@app.route(baseURL+'/alterLocation/<locationID>', methods=['POST'])
@requires_auth
def alterLocation(locationID):
locationID = int(locationID)
s = ''
if locationID < 0:
# New entry
s = 'insert into locations (name, container_id) '
s += 'values (:name, :container);'
s = text(s)
r = db_engine.execute(s,name=request.form['name'],container=request.form['container']);
r.close()
return '{"status":"ok"}'
else:
# Modify entry
s = 'update locations '
s += 'set name=:name, container_id=:container '
s += 'where id=:locationID;'
s = text(s)
r = db_engine.execute(s, name=request.form['name'],container=request.form['container'],locationID=locationID);
r.close()
return '{"status":"ok"}'
@app.route(baseURL+'/alterUser/<userID>', methods=['POST'])
@requires_auth
def alterUser(userID):
userID = int(userID)
s = ''
if userID < 0:
# New entry
s = 'insert into users (username, password) '
s += 'values (:name, :password);'
s = text(s)
r = db_engine.execute(s,name=request.form['name'],password=request.form['password']);
r.close()
return '{"status":"ok"}'
else:
# Modify entry
s = 'update users '
s += 'set username=:name, password=:password '
s += 'where id=:userID;'
s = text(s)
r = db_engine.execute(s, name=request.form['name'],password=request.form['password'],userID=userID);
r.close()
return '{"status":"ok"}'
@app.route(baseURL+'/getpartinfo/<partID>')
def get_part_info(partID):
s = 'select p.id,partno,description,notes, c.name || l.name as location_descriptor, location_id, container_id, datasheet from parts as p inner join locations as l on p.location_id = l.id inner join containers as c on l.container_id = c.id where p.id = :id;'
r = db_engine.execute(text(s), id=partID)
l = []
for row in r:
l.append(dict(row))
r.close()
return json.dumps(l[0])
@app.route(baseURL+'/query/<filter_dummy>/<query>') # TODO: maybe change AND to OR or maybe not
def query(filter_dummy, query):
filter = request.args.to_dict()
keywords = query.split() # Default splits with spaces
for i in range(len(keywords)):
keywords[i] = '%' + keywords[i] + '%'
kw_dict = {}
for i in range(len(keywords)):
kw_dict["kw" + str(i)] = keywords[i]
s = 'select p.id,partno,description, c.name || l.name as location_descriptor from parts as p inner join locations as l on p.location_id = l.id inner join containers as c on l.container_id = c.id where '
if filter['l'] == 'true':
s += '('
for i in range(len(keywords)):
s += 'LOWER(c.name||l.name) like LOWER(:kw'+ str(i) +') AND '
s = s[:-5]
s += ') OR '
if filter['p'] == 'true':
s += '('
for i in range(len(keywords)):
s += 'LOWER(partno) like LOWER(:kw'+ str(i) +') AND '
s = s[:-5]
s += ') OR '
if filter['d'] == 'true':
s += '('
for i in range(len(keywords)):
s += 'LOWER(description) like LOWER(:kw'+ str(i) +') AND '
s = s[:-5]
s += ') OR '
s = s[:-4] + ';'
s = text(s)
r = db_engine.execute(s, kw_dict)
l = []
for row in r:
l.append(dict(row))
r.close()
return json.dumps(l)
@app.route(baseURL+'/map/<containerID>')
@app.route(baseURL+'/map/<containerID>/<annotate>')
def getMap(containerID, annotate=None):
s = 'select map, overlay from containers where id = :id;'
r = db_engine.execute(text(s), id=containerID)
l = []
for row in r:
l.append(dict(row))
mapFile = l[0]['map']
overlayFile = l[0]['overlay']
try:
mapImage = Image.open('maps/' + mapFile).convert("RGBA")
overlayImage = Image.open('maps/overlays/' + overlayFile).convert("RGBA")
except FileNotFoundError:
return serveImage(Image.open("maps/404.png"))
# if request.args.get('x') is not None and request.args.get('y') is not None:
# x = int(request.args.get('x'))
# y = int(request.args.get('y'))
# pointer = Image.open('maps/here.png')
# pointerLayer = Image.new("RGBA", mapimage.size)
# width, height = pointer.size
# pointerLayer.paste(pointer, (x - int(width/2), y-int(height/2)))
mapimage = Image.alpha_composite(mapImage, overlayImage)
if annotate=='a':
try:
annotationName = mapFile.split('.')[0] + '-annotated.' + mapFile.split('.')[1]
annotationImage = Image.open('maps/' + annotationName).convert("RGBA")
mapimage = Image.alpha_composite(mapimage, annotationImage)
except FileNotFoundError:
pass
return serveImage(mapimage)
@app.route(baseURL+'/getfile/<filename>')
def getfile(filename):
if(re.match('^[\w\-_]+.[p|P][d|D][f|F]$', filename) == None):
return 'No injections pls.'
return send_from_directory('/srv/datasheets/', filename)
@app.route(baseURL+'/alter/<partID>', methods=['POST'])
@requires_auth
def alter(partID):
partID = int(partID)
s = ''
r = {}
if partID < 0:
# New entry
s = 'insert into parts (partno, description, datasheet, location_id, whoadded, notes) '
s += 'values (:partno, :description, :datasheet, :location_id, :user_id, :notes) returning id;'
s = text(s)
if len(request.files) != 0:
datasheet_file = request.files['datasheet-file']
datasheet_filename = secure_filename(datasheet_file.filename)
i = 1
while os.path.isfile('/srv/datasheets/' + datasheet_filename):
datasheet_filename = datasheet_filename[:-4] + str(i) + '.pdf'
i += 1
datasheet_file.save('/srv/datasheets/' + datasheet_filename)
datasheet_filename = 'http://elab.kth.se/parts/getfile/' + datasheet_filename
# elif request.form.has_key('datasheet-url'):
# datasheet_filename = request.form['datasheet-url']
else:
try:
datasheet_filename = request.form['datasheet-url']
except:
datasheet_filename = None
r = db_engine.execute(s, partno=request.form['partno'],
description=request.form['description'],
datasheet=datasheet_filename,
location_id=request.form['location_id'],
notes=request.form['notes'],
user_id=session['uid'])
else:
# Modify entry
r = db_engine.execute(text('select * from parts where id=:id;'), id=partID)
l = []
for row in r:
l.append(dict(row))
r.close()
s = 'update parts '
s += 'set partno=:partno, description=:description, datasheet=:datasheet, location_id=:location_id, notes=:notes '
if len(request.files) != 0:
datasheet_file = request.files['datasheet-file']
datasheet_filename = secure_filename(datasheet_file.filename)
i = 1
while os.path.isfile('/srv/datasheets/' + datasheet_filename):
datasheet_filename = datasheet_filename[:-4] + str(i) + '.pdf'
i += 1
datasheet_file.save('/srv/datasheets/' + datasheet_filename)
datasheet_filename = 'http://elab.kth.se/parts/getfile/' + datasheet_filename
if l[0]['datasheet'] != None:
whole_url = l[0]['datasheet']
actual_filename = whole_url[whole_url.rfind('/') + 1:]
os.remove('/srv/datasheets/' + actual_filename)
else:
try:
datasheet_filename = request.form['datasheet-url']
except:
datasheet_filename = l[0]['datasheet']
s += 'where id=:id returning id;'
s = text(s)
r = db_engine.execute(s, partno=request.form['partno'],
description=request.form['description'],
datasheet=datasheet_filename,
location_id=request.form['location_id'],
id=partID,
notes=request.form['notes'])
new_id = r.fetchone()[0]
r.close()
return '{"status":"ok", "part_id" : ' + str(new_id) + '}'
@app.route(baseURL+'/delete/<partID>')
@requires_auth
def delete(partID):
if int(partID) < 0:
abort(400)
s = text('delete from parts where id=:id;')
r = db_engine.execute(s, id=partID)
return '{"status":"ok"}'
@app.route(baseURL+'/deleteLocation/<locationID>')
@requires_auth
def deleteLocation(locationID):
if int(locationID) < 0:
abort(400)
s = text('delete from locations where id=:id;')
r = db_engine.execute(s, id=locationID)
return '{"status":"ok"}'
@app.route(baseURL+'/deleteUser/<userID>')
@requires_auth
def deleteUser(userID):
if int(userID) < 0:
abort(400)
s = text('delete from users where id=:id;')
r = db_engine.execute(s, id=userID)
return '{"status":"ok"}'
@app.route(baseURL+'/fetchOctopartSnippet/<searchTerm>')
def fetchOctopartSnippet(searchTerm):
if octopartURL == '':
return '{"result":"octopart integration not enabled"}'
args = {
'q': searchTerm,
'start': '0',
'limit': 1
}
data = requests.get(octopartURL, params=args)
search_response = json.loads(data.text)
result = '{"result":"no results. sorry :(("}'
if search_response['hits']>0:
try:
result = '{"result":"ok", "snippet":"' + (search_response['results'][0]['snippet']) + '"}';
except TypeError:
result = '{"result":"no results?"}'
return result
def connect(user, password, db, host='localhost', port=5432):
'''Returns a connection and a metadata object'''
# We connect with the help of the PostgreSQL URL
url = 'postgresql://{}:{}@{}:{}/{}'
url = url.format(user, password, host, port, db)
# The return value of create_engine() is our connection object
con = sqlalchemy.create_engine(url, client_encoding='utf8')
# We then bind the connection to MetaData()
meta = sqlalchemy.MetaData(bind=con)
meta.reflect(bind=con)
return con, meta
if __name__ == '__main__':
app.secret_key = 'asuiygdiahsdo[ainsfl]asfkjnb;asklnj'
app.config['SESSION_TYPE'] = 'memcached'
with open('admin.json') as f:
postgres_credentials = json.load(f)
db_engine, db_metadata = connect(postgres_credentials['username'], postgres_credentials['password'], 'parts_v3')
parts = sqlalchemy.Table('parts', db_metadata)
try:
with open('octopartAPIkey.json') as f:
j = json.load(f);
if j['key'] is not '':
octopartURL = j['URL'] + j['key']
print ("Octopart credentials loaded.")
else:
raise FileNotFoundError
except FileNotFoundError:
print ("NO OCTOPART KEY FOUND. ABANDONING THAT PART")
# Example query
'''s = select([parts]).where(parts.c.notes != '')
for row in db_engine.execute(s):
print row'''
app.run(host='127.0.0.1', port='5000', debug=False)