From 43006ce150f0167bb355b0059943de17c7d84601 Mon Sep 17 00:00:00 2001 From: Marek Date: Wed, 13 May 2026 13:20:09 +0200 Subject: [PATCH] Updated to sqlalchemy 2.x, as well as newer python version. New syntax to query execution, as well as some string literal comparison stuff --- parts/server.py | 430 ++++++++++++++++++++++++++++-------------------- 1 file changed, 255 insertions(+), 175 deletions(-) diff --git a/parts/server.py b/parts/server.py index 6ba1f44..a8b6544 100755 --- a/parts/server.py +++ b/parts/server.py @@ -7,7 +7,7 @@ from functools import wraps from sqlalchemy.sql import select from sqlalchemy.sql import text from flask import Flask -from flask import render_template, send_from_directory, request, Response, send_file, session +from flask import render_template, send_from_directory, request, Response, send_file, session, jsonify from PIL import Image, ImageDraw from io import BytesIO from os import listdir @@ -30,20 +30,22 @@ with open('taglines.txt', 'r') as infile: def getContainers(): query = "select id, name from containers order by UPPER(name);" - r = db_engine.execute(text(query)) - containers = [] - for row in r: - containers.append(dict(row)) - r.close() +# r = db_engine.execute(text(query)) +# containers = [] +# for row in r: +# containers.append(dict(row)) +# r.close() + with db_engine.connect() as conn: + r = conn.execute(text(query)) + containers = r.mappings().all() return containers def check_auth(username, password): query = "select id, password from users where username=:usrnm;" - r = db_engine.execute(text(query), usrnm=username) - results = [] - for row in r: - results.append(dict(row)) - r.close() + #r = db_engine.execute(text(query), usrnm=username) #SQLalchemy 1.x + with db_engine.connect() as conn: + r = conn.execute(text(query)) + results = r.mappings().all() if len(results)!=1: return False; @@ -79,99 +81,151 @@ def index(): @app.route(baseURL+'/getlocationsInContainer/') def get_locations_in_container(containerID): s = 'select id, name from locations where container_id = :id order by name;' - r = db_engine.execute(text(s), id=containerID) - l={} - for row in r: - l[str(row[0])]= row[1]; - r.close() - return json.dumps(l) - +# r = db_engine.execute(text(s), id=containerID) +# l={} +# for row in r: +# l[str(row[0])]= row[1]; +# r.close() +# return json.dumps(l) + with db_engine.connect() as conn: + r = conn.execute(text(s), {"id": containerID}) + l = {str(row.id): row.name for row in r} + + return jsonify(l) @app.route(baseURL+'/getlocationURL/') def get_locationURL(locationID): s = 'select map from locations where id = :id;' - r = db_engine.execute(text(s), id=locationID) - l=[]; - for row in r: - l.append(row); - r.close() - return l[0][0] +# r = db_engine.execute(text(s), id=locationID) +# l=[]; +# for row in r: +# l.append(row); +# r.close() +# return l[0][0] + with db_engine.connect() as conn: + result = conn.execute(text(s), {"id": locationID}).fetchone() + + if result is None: + return "" + + return result[0] @app.route(baseURL+'/locationEditor') def locationEditor(): query = 'select c.name as container, l.name as name, l.id, c.id as container_id from locations as l inner join containers as c on l.container_id = c.id order by container, name;' - r = db_engine.execute(text(query)) - locations = [] - for row in r: - locations.append(dict(row)) - r.close() - return render_template('locationEditor.html', locations=locations, containers=getContainers(),baseURL=baseURL) +# r = db_engine.execute(text(query)) +# locations = [] +# for row in r: +# locations.append(dict(row)) +# r.close() +# return render_template('locationEditor.html', locations=locations, containers=getContainers(),baseURL=baseURL) + + with db_engine.connect() as conn: + result = conn.execute(text(query)) + locations = [dict(row._mapping) for row in result] + + return render_template( + 'locationEditor.html', + locations=locations, + containers=getContainers(), + baseURL=baseURL + ) @app.route(baseURL+'/userEditor') def userEditor(): - query = 'select c.name as container, l.name as name, l.id, c.id as container_id from locations as l inner join containers as c on l.container_id = c.id order by container, name;' query = 'select id, username from users;' - r = db_engine.execute(text(query)) - users = [] - for row in r: - users.append(dict(row)) - r.close() +# r = db_engine.execute(text(query)) +# users = [] +# for row in r: +# users.append(dict(row)) +# r.close() + with db_engine.connect() as conn: + result = conn.execute(text(query)) + users = [dict(row._mapping) for row in result] + return render_template('userEditor.html', users=users, containers=getContainers(),baseURL=baseURL) -@app.route(baseURL+'/alterLocation/', methods=['POST']) +@app.route(baseURL + '/alterLocation/', methods=['POST']) @requires_auth def alterLocation(locationID): locationID = int(locationID) - s = '' + if locationID < 0: # New entry - s = 'insert into locations (name, container_id) ' - s += 'values (:name, :container);' - s = text(s) - r = db_engine.execute(s,name=request.form['name'],container=request.form['container']); - r.close() - return '{"status":"ok"}' + query = text(""" + insert into locations (name, container_id) + values (:name, :container) + """) + + with db_engine.begin() as conn: + conn.execute(query, { + "name": request.form['name'], + "container": request.form['container'] + }) + + return jsonify(status="ok") + else: # Modify entry - s = 'update locations ' - s += 'set name=:name, container_id=:container ' - s += 'where id=:locationID;' - s = text(s) - r = db_engine.execute(s, name=request.form['name'],container=request.form['container'],locationID=locationID); - r.close() - return '{"status":"ok"}' -@app.route(baseURL+'/alterUser/', methods=['POST']) + query = text(""" + update locations + set name=:name, + container_id=:container + where id=:locationID + """) + + with db_engine.begin() as conn: + conn.execute(query, { + "name": request.form['name'], + "container": request.form['container'], + "locationID": locationID + }) + + return jsonify(status="ok") + +@app.route(baseURL + '/alterUser/', methods=['POST']) @requires_auth def alterUser(userID): userID = int(userID) - s = '' + if userID < 0: # New entry - s = 'insert into users (username, password) ' - s += 'values (:name, :password);' - s = text(s) - r = db_engine.execute(s,name=request.form['name'],password=request.form['password']); - r.close() - return '{"status":"ok"}' + query = text(""" + insert into users (username, password) + values (:name, :password) + """) + + with db_engine.begin() as conn: + conn.execute(query, { + "name": request.form['name'], + "password": request.form['password'] + }) + + return jsonify(status="ok") + else: # Modify entry - s = 'update users ' - s += 'set username=:name, password=:password ' - s += 'where id=:userID;' - s = text(s) - r = db_engine.execute(s, name=request.form['name'],password=request.form['password'],userID=userID); - r.close() - return '{"status":"ok"}' - + query = text(""" + update users + set username=:name, password=:password + where id=:userID + """) + + with db_engine.begin() as conn: + conn.execute(query, { + "name": request.form['name'], + "password": request.form['password'], + "userID": userID + }) + + return jsonify(status="ok") @app.route(baseURL+'/getpartinfo/') def get_part_info(partID): s = 'select p.id,partno,description,notes, c.name || l.name as location_descriptor, location_id, container_id, datasheet from parts as p inner join locations as l on p.location_id = l.id inner join containers as c on l.container_id = c.id where p.id = :id;' - r = db_engine.execute(text(s), id=partID) - l = [] - for row in r: - l.append(dict(row)) - r.close() - return json.dumps(l[0]) + with db_engine.connect() as conn: + result = conn.execute(text(s), {"id": partID}) + row = result.mappings().first() + return jsonify(dict(row) if row else {}) @app.route(baseURL+'/query//') # TODO: maybe change AND to OR or maybe not def query(filter_dummy, query): filter = request.args.to_dict() @@ -203,24 +257,24 @@ def query(filter_dummy, query): s = s[:-5] s += ') OR ' s = s[:-4] + ';' - s = text(s) - r = db_engine.execute(s, kw_dict) - l = [] - for row in r: - l.append(dict(row)) - r.close() +# s = text(s) +# r = db_engine.execute(s, kw_dict) +# l = [] +# for row in r: +# l.append(dict(row)) +# r.close() + with db_engine.connect() as conn: + l = [dict(row) for row in conn.execute(text(s), kw_dict).mappings()] return json.dumps(l) @app.route(baseURL+'/map/') @app.route(baseURL+'/map//') def getMap(containerID, annotate=None): s = 'select map, overlay from containers where id = :id;' - r = db_engine.execute(text(s), id=containerID) - l = [] - for row in r: - l.append(dict(row)) - mapFile = l[0]['map'] - overlayFile = l[0]['overlay'] + with db_engine.connect() as conn: + row = conn.execute(text(s), {"id": containerID}).mappings().first() + mapFile = row['map'] + overlayFile = row['overlay'] try: mapImage = Image.open('maps/' + mapFile).convert("RGBA") overlayImage = Image.open('maps/overlays/' + overlayFile).convert("RGBA") @@ -246,109 +300,134 @@ def getMap(containerID, annotate=None): @app.route(baseURL+'/getfile/') def getfile(filename): - if(re.match('^[\w\-_]+.[p|P][d|D][f|F]$', filename) == None): +# if(re.match('^[\w\-_]+.[p|P][d|D][f|F]$', filename) == None): + if(re.match(r'^[\w-]+\.pdf$', filename, re.IGNORECASE) == None): return 'No injections pls.' return send_from_directory('/srv/datasheets/', filename) -@app.route(baseURL+'/alter/', methods=['POST']) +@app.route(baseURL + '/alter/', methods=['POST']) @requires_auth def alter(partID): partID = int(partID) - s = '' - r = {} + + # ------------------------- + # HANDLE FILE UPLOAD + # ------------------------- + if len(request.files) != 0: + datasheet_file = request.files['datasheet-file'] + datasheet_filename = secure_filename(datasheet_file.filename) + + i = 1 + while os.path.isfile('/srv/datasheets/' + datasheet_filename): + datasheet_filename = datasheet_filename[:-4] + str(i) + '.pdf' + i += 1 + + datasheet_file.save('/srv/datasheets/' + datasheet_filename) + datasheet_filename = 'http://elab.kth.se/parts/getfile/' + datasheet_filename + else: + try: + datasheet_filename = request.form['datasheet-url'] + except: + datasheet_filename = None + + # ------------------------- + # INSERT + # ------------------------- if partID < 0: - # New entry - s = 'insert into parts (partno, description, datasheet, location_id, whoadded, notes) ' - s += 'values (:partno, :description, :datasheet, :location_id, :user_id, :notes) returning id;' - s = text(s) - if len(request.files) != 0: - datasheet_file = request.files['datasheet-file'] - datasheet_filename = secure_filename(datasheet_file.filename) - i = 1 - while os.path.isfile('/srv/datasheets/' + datasheet_filename): - datasheet_filename = datasheet_filename[:-4] + str(i) + '.pdf' - i += 1 - datasheet_file.save('/srv/datasheets/' + datasheet_filename) - datasheet_filename = 'http://elab.kth.se/parts/getfile/' + datasheet_filename - # elif request.form.has_key('datasheet-url'): - # datasheet_filename = request.form['datasheet-url'] - else: - try: - datasheet_filename = request.form['datasheet-url'] - except: - datasheet_filename = None - r = db_engine.execute(s, partno=request.form['partno'], - description=request.form['description'], - datasheet=datasheet_filename, - location_id=request.form['location_id'], - notes=request.form['notes'], - user_id=session['uid']) + query = text(""" + insert into parts + (partno, description, datasheet, location_id, whoadded, notes) + values + (:partno, :description, :datasheet, :location_id, :user_id, :notes) + returning id + """) + + with db_engine.begin() as conn: + result = conn.execute(query, { + "partno": request.form['partno'], + "description": request.form['description'], + "datasheet": datasheet_filename, + "location_id": request.form['location_id'], + "notes": request.form['notes'], + "user_id": session['uid'] + }) + + new_id = result.mappings().first()["id"] + + return jsonify(status="ok", part_id=new_id) + + # ------------------------- + # UPDATE + # ------------------------- else: - # Modify entry - r = db_engine.execute(text('select * from parts where id=:id;'), id=partID) - l = [] - for row in r: - l.append(dict(row)) - r.close() - s = 'update parts ' - s += 'set partno=:partno, description=:description, datasheet=:datasheet, location_id=:location_id, notes=:notes ' - if len(request.files) != 0: - datasheet_file = request.files['datasheet-file'] - datasheet_filename = secure_filename(datasheet_file.filename) - i = 1 - while os.path.isfile('/srv/datasheets/' + datasheet_filename): - datasheet_filename = datasheet_filename[:-4] + str(i) + '.pdf' - i += 1 - datasheet_file.save('/srv/datasheets/' + datasheet_filename) - datasheet_filename = 'http://elab.kth.se/parts/getfile/' + datasheet_filename - if l[0]['datasheet'] != None: - whole_url = l[0]['datasheet'] - actual_filename = whole_url[whole_url.rfind('/') + 1:] - os.remove('/srv/datasheets/' + actual_filename) - else: - try: - datasheet_filename = request.form['datasheet-url'] - except: - datasheet_filename = l[0]['datasheet'] - s += 'where id=:id returning id;' - s = text(s) - r = db_engine.execute(s, partno=request.form['partno'], - description=request.form['description'], - datasheet=datasheet_filename, - location_id=request.form['location_id'], - id=partID, - notes=request.form['notes']) - - new_id = r.fetchone()[0] - r.close() - return '{"status":"ok", "part_id" : ' + str(new_id) + '}' - -@app.route(baseURL+'/delete/') + query = text(""" + update parts + set partno=:partno, + description=:description, + datasheet=:datasheet, + location_id=:location_id, + notes=:notes + where id=:id + returning id + """) + + with db_engine.begin() as conn: + result = conn.execute(query, { + "partno": request.form['partno'], + "description": request.form['description'], + "datasheet": datasheet_filename, + "location_id": request.form['location_id'], + "notes": request.form['notes'], + "id": partID + }) + + new_id = result.mappings().first()["id"] + + return jsonify(status="ok", part_id=new_id) +@app.route(baseURL + '/delete/') @requires_auth def delete(partID): - if int(partID) < 0: + partID = int(partID) + if partID < 0: abort(400) - s = text('delete from parts where id=:id;') - r = db_engine.execute(s, id=partID) - return '{"status":"ok"}' -@app.route(baseURL+'/deleteLocation/') + query = text('delete from parts where id=:id') + + with db_engine.begin() as conn: + conn.execute(query, {"id": partID}) + + return jsonify(status="ok") + + +@app.route(baseURL + '/deleteLocation/') @requires_auth def deleteLocation(locationID): - if int(locationID) < 0: + locationID = int(locationID) + if locationID < 0: abort(400) - s = text('delete from locations where id=:id;') - r = db_engine.execute(s, id=locationID) - return '{"status":"ok"}' -@app.route(baseURL+'/deleteUser/') + + query = text('delete from locations where id=:id') + + with db_engine.begin() as conn: + conn.execute(query, {"id": locationID}) + + return jsonify(status="ok") + + +@app.route(baseURL + '/deleteUser/') @requires_auth def deleteUser(userID): - if int(userID) < 0: + userID = int(userID) + if userID < 0: abort(400) - s = text('delete from users where id=:id;') - r = db_engine.execute(s, id=userID) - return '{"status":"ok"}' + + query = text('delete from users where id=:id') + + with db_engine.begin() as conn: + conn.execute(query, {"id": userID}) + + return jsonify(status="ok") @app.route(baseURL+'/fetchOctopartSnippet/') def fetchOctopartSnippet(searchTerm): @@ -380,8 +459,8 @@ def connect(user, password, db, host='localhost', port=5432): con = sqlalchemy.create_engine(url, client_encoding='utf8') # We then bind the connection to MetaData() - meta = sqlalchemy.MetaData(bind=con) - meta.reflect(bind=con) + meta = sqlalchemy.MetaData() + meta.reflect(con) return con, meta @@ -395,7 +474,7 @@ if __name__ == '__main__': try: with open('octopartAPIkey.json') as f: j = json.load(f); - if j['key'] is not '': + if j['key'] != '': octopartURL = j['URL'] + j['key'] print ("Octopart credentials loaded.") else: @@ -404,6 +483,7 @@ if __name__ == '__main__': print ("NO OCTOPART KEY FOUND. ABANDONING THAT PART") # Example query '''s = select([parts]).where(parts.c.notes != '') - for row in db_engine.execute(s): - print row''' - app.run(host='127.0.0.1', port='5000', debug=False) + with db_engine.connect() as conn: + r = conn.execute(text(query)) + ''' +app.run(host='127.0.0.1', port='5000', debug=False)