You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
490 lines
16 KiB
490 lines
16 KiB
import os
|
|
import re
|
|
import json
|
|
import requests
|
|
import sqlalchemy
|
|
from functools import wraps
|
|
from sqlalchemy.sql import select
|
|
from sqlalchemy.sql import text
|
|
from flask import Flask
|
|
from flask import render_template, send_from_directory, request, Response, send_file, session, jsonify
|
|
from PIL import Image, ImageDraw
|
|
from io import BytesIO
|
|
from os import listdir
|
|
from os.path import isfile, join
|
|
from werkzeug.utils import secure_filename
|
|
import random
|
|
|
|
app = Flask(__name__)
|
|
|
|
db_engine = {}
|
|
db_metadata = {}
|
|
parts = {}
|
|
octopartURL =""
|
|
baseURL = "/parts"
|
|
|
|
taglines = []
|
|
with open('taglines.txt', 'r') as infile:
|
|
for line in infile:
|
|
taglines.append(line[:-1])
|
|
|
|
def getContainers():
|
|
query = "select id, name from containers order by UPPER(name);"
|
|
# r = db_engine.execute(text(query))
|
|
# containers = []
|
|
# for row in r:
|
|
# containers.append(dict(row))
|
|
# r.close()
|
|
with db_engine.connect() as conn:
|
|
r = conn.execute(text(query))
|
|
containers = r.mappings().all()
|
|
return containers
|
|
|
|
def check_auth(username, password):
|
|
query = "select id, password from users where username=:usrnm;"
|
|
#r = db_engine.execute(text(query), usrnm=username) #SQLalchemy 1.x
|
|
with db_engine.connect() as conn:
|
|
r = conn.execute(text(query))
|
|
results = r.mappings().all()
|
|
if len(results)!=1:
|
|
return False;
|
|
|
|
if results[0]['password']==password:
|
|
session['uid'] = results[0]['id']
|
|
print (session['uid'])
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def authenticate():
|
|
return Response('Could not verify access level. Please retry', 401, {'WWW-Authenticate' : 'Basic realm="Login Required"'})
|
|
|
|
def requires_auth(f):
|
|
@wraps(f)
|
|
def decorated(*args, **kwargs):
|
|
auth = request.authorization
|
|
if not auth or not check_auth(auth.username, auth.password):
|
|
return authenticate()
|
|
return f(*args, **kwargs)
|
|
return decorated
|
|
|
|
def serveImage(img):
|
|
img_io = BytesIO()
|
|
img.convert("RGB").save(img_io, 'PNG')
|
|
img_io.seek(0)
|
|
return send_file(img_io, mimetype='image/png')
|
|
|
|
@app.route(baseURL, strict_slashes=True)
|
|
def index():
|
|
return render_template('partsearch.html', containers=getContainers(), baseURL=baseURL, tagline=random.choice(taglines))
|
|
|
|
@app.route(baseURL+'/getlocationsInContainer/<containerID>')
|
|
def get_locations_in_container(containerID):
|
|
s = 'select id, name from locations where container_id = :id order by name;'
|
|
# r = db_engine.execute(text(s), id=containerID)
|
|
# l={}
|
|
# for row in r:
|
|
# l[str(row[0])]= row[1];
|
|
# r.close()
|
|
# return json.dumps(l)
|
|
with db_engine.connect() as conn:
|
|
r = conn.execute(text(s), {"id": containerID})
|
|
l = {str(row.id): row.name for row in r}
|
|
|
|
return jsonify(l)
|
|
@app.route(baseURL+'/getlocationURL/<locationID>')
|
|
def get_locationURL(locationID):
|
|
s = 'select map from locations where id = :id;'
|
|
# r = db_engine.execute(text(s), id=locationID)
|
|
# l=[];
|
|
# for row in r:
|
|
# l.append(row);
|
|
# r.close()
|
|
# return l[0][0]
|
|
with db_engine.connect() as conn:
|
|
result = conn.execute(text(s), {"id": locationID}).fetchone()
|
|
|
|
if result is None:
|
|
return ""
|
|
|
|
return result[0]
|
|
|
|
@app.route(baseURL+'/locationEditor')
|
|
def locationEditor():
|
|
query = 'select c.name as container, l.name as name, l.id, c.id as container_id from locations as l inner join containers as c on l.container_id = c.id order by container, name;'
|
|
# r = db_engine.execute(text(query))
|
|
# locations = []
|
|
# for row in r:
|
|
# locations.append(dict(row))
|
|
# r.close()
|
|
# return render_template('locationEditor.html', locations=locations, containers=getContainers(),baseURL=baseURL)
|
|
|
|
with db_engine.connect() as conn:
|
|
result = conn.execute(text(query))
|
|
locations = [dict(row._mapping) for row in result]
|
|
|
|
return render_template(
|
|
'locationEditor.html',
|
|
locations=locations,
|
|
containers=getContainers(),
|
|
baseURL=baseURL
|
|
)
|
|
@app.route(baseURL+'/userEditor')
|
|
def userEditor():
|
|
query = 'select id, username from users;'
|
|
|
|
# r = db_engine.execute(text(query))
|
|
# users = []
|
|
# for row in r:
|
|
# users.append(dict(row))
|
|
# r.close()
|
|
with db_engine.connect() as conn:
|
|
result = conn.execute(text(query))
|
|
users = [dict(row._mapping) for row in result]
|
|
|
|
return render_template('userEditor.html', users=users, containers=getContainers(),baseURL=baseURL)
|
|
|
|
@app.route(baseURL + '/alterLocation/<locationID>', methods=['POST'])
|
|
@requires_auth
|
|
def alterLocation(locationID):
|
|
locationID = int(locationID)
|
|
|
|
if locationID < 0:
|
|
# New entry
|
|
query = text("""
|
|
insert into locations (name, container_id)
|
|
values (:name, :container)
|
|
""")
|
|
|
|
with db_engine.begin() as conn:
|
|
conn.execute(query, {
|
|
"name": request.form['name'],
|
|
"container": request.form['container']
|
|
})
|
|
|
|
return jsonify(status="ok")
|
|
|
|
else:
|
|
# Modify entry
|
|
query = text("""
|
|
update locations
|
|
set name=:name,
|
|
container_id=:container
|
|
where id=:locationID
|
|
""")
|
|
|
|
with db_engine.begin() as conn:
|
|
conn.execute(query, {
|
|
"name": request.form['name'],
|
|
"container": request.form['container'],
|
|
"locationID": locationID
|
|
})
|
|
|
|
return jsonify(status="ok")
|
|
|
|
@app.route(baseURL + '/alterUser/<userID>', methods=['POST'])
|
|
@requires_auth
|
|
def alterUser(userID):
|
|
userID = int(userID)
|
|
|
|
if userID < 0:
|
|
# New entry
|
|
query = text("""
|
|
insert into users (username, password)
|
|
values (:name, :password)
|
|
""")
|
|
|
|
with db_engine.begin() as conn:
|
|
conn.execute(query, {
|
|
"name": request.form['name'],
|
|
"password": request.form['password']
|
|
})
|
|
|
|
return jsonify(status="ok")
|
|
|
|
else:
|
|
# Modify entry
|
|
query = text("""
|
|
update users
|
|
set username=:name, password=:password
|
|
where id=:userID
|
|
""")
|
|
|
|
with db_engine.begin() as conn:
|
|
conn.execute(query, {
|
|
"name": request.form['name'],
|
|
"password": request.form['password'],
|
|
"userID": userID
|
|
})
|
|
|
|
return jsonify(status="ok")
|
|
@app.route(baseURL+'/getpartinfo/<partID>')
|
|
def get_part_info(partID):
|
|
s = 'select p.id,partno,description,notes, c.name || l.name as location_descriptor, location_id, container_id, datasheet from parts as p inner join locations as l on p.location_id = l.id inner join containers as c on l.container_id = c.id where p.id = :id;'
|
|
with db_engine.connect() as conn:
|
|
result = conn.execute(text(s), {"id": partID})
|
|
row = result.mappings().first()
|
|
|
|
return jsonify(dict(row) if row else {})
|
|
@app.route(baseURL+'/query/<filter_dummy>/<query>') # TODO: maybe change AND to OR or maybe not
|
|
def query(filter_dummy, query):
|
|
filter = request.args.to_dict()
|
|
keywords = query.split() # Default splits with spaces
|
|
for i in range(len(keywords)):
|
|
keywords[i] = '%' + keywords[i] + '%'
|
|
kw_dict = {}
|
|
for i in range(len(keywords)):
|
|
kw_dict["kw" + str(i)] = keywords[i]
|
|
|
|
s = 'select p.id,partno,description, c.name || l.name as location_descriptor from parts as p inner join locations as l on p.location_id = l.id inner join containers as c on l.container_id = c.id where '
|
|
|
|
if filter['l'] == 'true':
|
|
s += '('
|
|
for i in range(len(keywords)):
|
|
s += 'LOWER(c.name||l.name) like LOWER(:kw'+ str(i) +') AND '
|
|
s = s[:-5]
|
|
s += ') OR '
|
|
if filter['p'] == 'true':
|
|
s += '('
|
|
for i in range(len(keywords)):
|
|
s += 'LOWER(partno) like LOWER(:kw'+ str(i) +') AND '
|
|
s = s[:-5]
|
|
s += ') OR '
|
|
if filter['d'] == 'true':
|
|
s += '('
|
|
for i in range(len(keywords)):
|
|
s += 'LOWER(description) like LOWER(:kw'+ str(i) +') AND '
|
|
s = s[:-5]
|
|
s += ') OR '
|
|
s = s[:-4] + ';'
|
|
# s = text(s)
|
|
# r = db_engine.execute(s, kw_dict)
|
|
# l = []
|
|
# for row in r:
|
|
# l.append(dict(row))
|
|
# r.close()
|
|
with db_engine.connect() as conn:
|
|
l = [dict(row) for row in conn.execute(text(s), kw_dict).mappings()]
|
|
return json.dumps(l)
|
|
|
|
@app.route(baseURL+'/map/<containerID>')
|
|
@app.route(baseURL+'/map/<containerID>/<annotate>')
|
|
def getMap(containerID, annotate=None):
|
|
s = 'select map, overlay from containers where id = :id;'
|
|
with db_engine.connect() as conn:
|
|
row = conn.execute(text(s), {"id": containerID}).mappings().first()
|
|
mapFile = row['map']
|
|
overlayFile = row['overlay']
|
|
try:
|
|
mapImage = Image.open('maps/' + mapFile).convert("RGBA")
|
|
overlayImage = Image.open('maps/overlays/' + overlayFile).convert("RGBA")
|
|
except FileNotFoundError:
|
|
return serveImage(Image.open("maps/404.png"))
|
|
# if request.args.get('x') is not None and request.args.get('y') is not None:
|
|
# x = int(request.args.get('x'))
|
|
# y = int(request.args.get('y'))
|
|
# pointer = Image.open('maps/here.png')
|
|
# pointerLayer = Image.new("RGBA", mapimage.size)
|
|
# width, height = pointer.size
|
|
# pointerLayer.paste(pointer, (x - int(width/2), y-int(height/2)))
|
|
mapimage = Image.alpha_composite(mapImage, overlayImage)
|
|
if annotate=='a':
|
|
try:
|
|
annotationName = mapFile.split('.')[0] + '-annotated.' + mapFile.split('.')[1]
|
|
annotationImage = Image.open('maps/' + annotationName).convert("RGBA")
|
|
mapimage = Image.alpha_composite(mapimage, annotationImage)
|
|
except FileNotFoundError:
|
|
pass
|
|
return serveImage(mapimage)
|
|
|
|
|
|
@app.route(baseURL+'/getfile/<filename>')
|
|
def getfile(filename):
|
|
# if(re.match('^[\w\-_]+.[p|P][d|D][f|F]$', filename) == None):
|
|
if(re.match(r'^[\w-]+\.pdf$', filename, re.IGNORECASE) == None):
|
|
return 'No injections pls.'
|
|
|
|
return send_from_directory('/srv/datasheets/', filename)
|
|
|
|
@app.route(baseURL + '/alter/<partID>', methods=['POST'])
|
|
@requires_auth
|
|
def alter(partID):
|
|
partID = int(partID)
|
|
|
|
# -------------------------
|
|
# HANDLE FILE UPLOAD
|
|
# -------------------------
|
|
if len(request.files) != 0:
|
|
datasheet_file = request.files['datasheet-file']
|
|
datasheet_filename = secure_filename(datasheet_file.filename)
|
|
|
|
i = 1
|
|
while os.path.isfile('/srv/datasheets/' + datasheet_filename):
|
|
datasheet_filename = datasheet_filename[:-4] + str(i) + '.pdf'
|
|
i += 1
|
|
|
|
datasheet_file.save('/srv/datasheets/' + datasheet_filename)
|
|
datasheet_filename = 'http://elab.kth.se/parts/getfile/' + datasheet_filename
|
|
else:
|
|
try:
|
|
datasheet_filename = request.form['datasheet-url']
|
|
except:
|
|
datasheet_filename = None
|
|
|
|
# -------------------------
|
|
# INSERT
|
|
# -------------------------
|
|
if partID < 0:
|
|
query = text("""
|
|
insert into parts
|
|
(partno, description, datasheet, location_id, whoadded, notes)
|
|
values
|
|
(:partno, :description, :datasheet, :location_id, :user_id, :notes)
|
|
returning id
|
|
""")
|
|
|
|
with db_engine.begin() as conn:
|
|
result = conn.execute(query, {
|
|
"partno": request.form['partno'],
|
|
"description": request.form['description'],
|
|
"datasheet": datasheet_filename,
|
|
"location_id": request.form['location_id'],
|
|
"notes": request.form['notes'],
|
|
"user_id": session['uid']
|
|
})
|
|
|
|
new_id = result.mappings().first()["id"]
|
|
|
|
return jsonify(status="ok", part_id=new_id)
|
|
|
|
# -------------------------
|
|
# UPDATE
|
|
# -------------------------
|
|
else:
|
|
query = text("""
|
|
update parts
|
|
set partno=:partno,
|
|
description=:description,
|
|
datasheet=:datasheet,
|
|
location_id=:location_id,
|
|
notes=:notes
|
|
where id=:id
|
|
returning id
|
|
""")
|
|
|
|
with db_engine.begin() as conn:
|
|
result = conn.execute(query, {
|
|
"partno": request.form['partno'],
|
|
"description": request.form['description'],
|
|
"datasheet": datasheet_filename,
|
|
"location_id": request.form['location_id'],
|
|
"notes": request.form['notes'],
|
|
"id": partID
|
|
})
|
|
|
|
new_id = result.mappings().first()["id"]
|
|
|
|
return jsonify(status="ok", part_id=new_id)
|
|
@app.route(baseURL + '/delete/<partID>')
|
|
@requires_auth
|
|
def delete(partID):
|
|
partID = int(partID)
|
|
if partID < 0:
|
|
abort(400)
|
|
|
|
query = text('delete from parts where id=:id')
|
|
|
|
with db_engine.begin() as conn:
|
|
conn.execute(query, {"id": partID})
|
|
|
|
return jsonify(status="ok")
|
|
|
|
|
|
@app.route(baseURL + '/deleteLocation/<locationID>')
|
|
@requires_auth
|
|
def deleteLocation(locationID):
|
|
locationID = int(locationID)
|
|
if locationID < 0:
|
|
abort(400)
|
|
|
|
query = text('delete from locations where id=:id')
|
|
|
|
with db_engine.begin() as conn:
|
|
conn.execute(query, {"id": locationID})
|
|
|
|
return jsonify(status="ok")
|
|
|
|
|
|
@app.route(baseURL + '/deleteUser/<userID>')
|
|
@requires_auth
|
|
def deleteUser(userID):
|
|
userID = int(userID)
|
|
if userID < 0:
|
|
abort(400)
|
|
|
|
query = text('delete from users where id=:id')
|
|
|
|
with db_engine.begin() as conn:
|
|
conn.execute(query, {"id": userID})
|
|
|
|
return jsonify(status="ok")
|
|
|
|
@app.route(baseURL+'/fetchOctopartSnippet/<searchTerm>')
|
|
def fetchOctopartSnippet(searchTerm):
|
|
if octopartURL == '':
|
|
return '{"result":"octopart integration not enabled"}'
|
|
args = {
|
|
'q': searchTerm,
|
|
'start': '0',
|
|
'limit': 1
|
|
}
|
|
data = requests.get(octopartURL, params=args)
|
|
search_response = json.loads(data.text)
|
|
|
|
result = '{"result":"no results. sorry :(("}'
|
|
if search_response['hits']>0:
|
|
try:
|
|
result = '{"result":"ok", "snippet":"' + (search_response['results'][0]['snippet']) + '"}';
|
|
except TypeError:
|
|
result = '{"result":"no results?"}'
|
|
return result
|
|
|
|
def connect(user, password, db, host='localhost', port=5432):
|
|
'''Returns a connection and a metadata object'''
|
|
# We connect with the help of the PostgreSQL URL
|
|
url = 'postgresql://{}:{}@{}:{}/{}'
|
|
url = url.format(user, password, host, port, db)
|
|
|
|
# The return value of create_engine() is our connection object
|
|
con = sqlalchemy.create_engine(url, client_encoding='utf8')
|
|
|
|
# We then bind the connection to MetaData()
|
|
meta = sqlalchemy.MetaData()
|
|
meta.reflect(con)
|
|
|
|
return con, meta
|
|
|
|
if __name__ == '__main__':
|
|
app.secret_key = 'asuiygdiahsdo[ainsfl]asfkjnb;asklnj'
|
|
app.config['SESSION_TYPE'] = 'memcached'
|
|
with open('admin.json') as f:
|
|
postgres_credentials = json.load(f)
|
|
db_engine, db_metadata = connect(postgres_credentials['username'], postgres_credentials['password'], 'parts_v3')
|
|
parts = sqlalchemy.Table('parts', db_metadata)
|
|
try:
|
|
with open('octopartAPIkey.json') as f:
|
|
j = json.load(f);
|
|
if j['key'] != '':
|
|
octopartURL = j['URL'] + j['key']
|
|
print ("Octopart credentials loaded.")
|
|
else:
|
|
raise FileNotFoundError
|
|
except FileNotFoundError:
|
|
print ("NO OCTOPART KEY FOUND. ABANDONING THAT PART")
|
|
# Example query
|
|
'''s = select([parts]).where(parts.c.notes != '')
|
|
with db_engine.connect() as conn:
|
|
r = conn.execute(text(query))
|
|
'''
|
|
app.run(host='127.0.0.1', port='5000', debug=False)
|