import os
import re
import json
import sqlalchemy
from functools import wraps
from sqlalchemy . sql import select
from sqlalchemy . sql import text
from flask import Flask
from flask import render_template , send_from_directory , request , Response , send_file
from PIL import Image , ImageDraw
from io import BytesIO
from os import listdir
from os . path import isfile , join
from werkzeug . utils import secure_filename
app = Flask ( __name__ )
db_engine = { }
db_metadata = { }
parts = { }
def getContainers ( ) :
query = " select id, name from containers order by UPPER(name); "
r = db_engine . execute ( text ( query ) )
containers = [ ]
for row in r :
containers . append ( dict ( row ) )
r . close ( )
return containers
def check_auth ( username , password ) :
admin_list = [ ]
with open ( ' edit_admin.json ' , ' r ' ) as admin :
admin_list = json . load ( admin )
for user in admin_list :
if username == user [ ' username ' ] :
return password == user [ ' password ' ]
def authenticate ( ) :
return Response ( ' Could not verify access level. Please retry ' , 401 , { ' WWW-Authenticate ' : ' Basic realm= " Login Required " ' } )
def requires_auth ( f ) :
@wraps ( f )
def decorated ( * args , * * kwargs ) :
auth = request . authorization
if not auth or not check_auth ( auth . username , auth . password ) :
return authenticate ( )
return f ( * args , * * kwargs )
return decorated
def serveImage ( img ) :
img_io = BytesIO ( )
img . convert ( " RGB " ) . save ( img_io , ' PNG ' )
img_io . seek ( 0 )
return send_file ( img_io , mimetype = ' image/png ' )
@app.route ( ' /parts ' , strict_slashes = False )
def index ( ) :
return render_template ( ' partsearch.html ' , containers = getContainers ( ) )
@app.route ( ' /parts/getlocationsInContainer/<containerID> ' )
def get_locations_in_container ( containerID ) :
s = ' select id, name from locations where container_id = :id order by name; '
r = db_engine . execute ( text ( s ) , id = containerID )
l = { }
for row in r :
l [ row [ 0 ] ] = row [ 1 ] ;
r . close ( )
return json . dumps ( l )
@app.route ( ' /parts/getlocationURL/<locationID> ' )
def get_locationURL ( locationID ) :
s = ' select map from locations where id = :id; '
r = db_engine . execute ( text ( s ) , id = locationID )
l = [ ] ;
for row in r :
l . append ( row ) ;
r . close ( )
return l [ 0 ] [ 0 ]
@app.route ( ' /parts/locationEditor ' )
def locationEditor ( ) :
query = ' select c.name as container, l.name as name, l.id, c.id as container_id from locations as l inner join containers as c on l.container_id = c.id order by container, name; '
r = db_engine . execute ( text ( query ) )
locations = [ ]
for row in r :
locations . append ( dict ( row ) )
r . close ( )
return render_template ( ' locationEditor.html ' , locations = locations , containers = getContainers ( ) )
@app.route ( ' /parts/alterLocation/<locationID> ' , methods = [ ' POST ' ] )
# @requires_auth
def alterLocation ( locationID ) :
locationID = int ( locationID )
s = ' '
if locationID < 0 :
# New entry
s = ' insert into locations (name, container_id) '
s + = ' values (:name, :container); '
s = text ( s )
r = db_engine . execute ( s , name = request . form [ ' name ' ] , container = request . form [ ' container ' ] ) ;
r . close ( )
return ' { " status " : " ok " } '
else :
# Modify entry
s = ' update locations '
s + = ' set name=:name, container_id=:container '
s + = ' where id=:locationID; '
s = text ( s )
r = db_engine . execute ( s , name = request . form [ ' name ' ] , container = request . form [ ' container ' ] , locationID = locationID ) ;
r . close ( )
return ' { " status " : " ok " } '
@app.route ( ' /parts/getpartinfo/<partID> ' )
def get_part_info ( partID ) :
s = ' select p.id,partno,description, c.name || l.name as location_descriptor, location_id, container_id, datasheet from parts as p inner join locations as l on p.location_id = l.id inner join containers as c on l.container_id = c.id where p.id = :id; '
r = db_engine . execute ( text ( s ) , id = partID )
l = [ ]
for row in r :
l . append ( dict ( row ) )
r . close ( )
return json . dumps ( l [ 0 ] )
@app.route ( ' /parts/query/<filter_dummy>/<query> ' ) # TODO: maybe change AND to OR or maybe not
def query ( filter_dummy , query ) :
filter = request . args . to_dict ( )
keywords = query . split ( ) # Default splits with spaces
for i in range ( len ( keywords ) ) :
keywords [ i ] = ' % ' + keywords [ i ] + ' % '
kw_dict = { }
for i in range ( len ( keywords ) ) :
kw_dict [ " kw " + str ( i ) ] = keywords [ i ]
s = ' select p.id,partno,description, c.name || l.name as location_descriptor from parts as p inner join locations as l on p.location_id = l.id inner join containers as c on l.container_id = c.id where '
if filter [ ' l ' ] == ' true ' :
s + = ' ( '
for i in range ( len ( keywords ) ) :
s + = ' LOWER(l.name) like LOWER(:kw ' + str ( i ) + ' ) AND '
s = s [ : - 5 ]
s + = ' ) OR '
if filter [ ' p ' ] == ' true ' :
s + = ' ( '
for i in range ( len ( keywords ) ) :
s + = ' LOWER(partno) like LOWER(:kw ' + str ( i ) + ' ) AND '
s = s [ : - 5 ]
s + = ' ) OR '
if filter [ ' d ' ] == ' true ' :
s + = ' ( '
for i in range ( len ( keywords ) ) :
s + = ' LOWER(description) like LOWER(:kw ' + str ( i ) + ' ) AND '
s = s [ : - 5 ]
s + = ' ) OR '
s = s [ : - 4 ] + ' ; '
s = text ( s )
r = db_engine . execute ( s , kw_dict )
l = [ ]
for row in r :
l . append ( dict ( row ) )
r . close ( )
return json . dumps ( l )
@app.route ( ' /parts/map/<containerID> ' )
def getMap ( containerID ) :
s = ' select map, overlay from containers where id = :id; '
r = db_engine . execute ( text ( s ) , id = containerID )
l = [ ]
for row in r :
l . append ( dict ( row ) )
mapFile = l [ 0 ] [ ' map ' ]
overlayFile = l [ 0 ] [ ' overlay ' ]
try :
mapImage = Image . open ( ' maps/ ' + mapFile ) . convert ( " RGBA " )
overlayImage = Image . open ( ' maps/overlays/ ' + overlayFile ) . convert ( " RGBA " )
except FileNotFoundError :
return serveImage ( Image . open ( " maps/404.png " ) )
# if request.args.get('x') is not None and request.args.get('y') is not None:
# x = int(request.args.get('x'))
# y = int(request.args.get('y'))
# pointer = Image.open('maps/here.png')
# pointerLayer = Image.new("RGBA", mapimage.size)
# width, height = pointer.size
# pointerLayer.paste(pointer, (x - int(width/2), y-int(height/2)))
mapimage = Image . alpha_composite ( mapImage , overlayImage )
return serveImage ( mapimage )
@app.route ( ' /parts/getfile/<filename> ' )
def getfile ( filename ) :
if ( re . match ( ' ^[ \ w \ -_]+$ ' , filename ) == None ) :
return ' No injections pls. '
return send_from_directory ( ' /srv/datasheets/ ' , filename + ' .pdf ' )
@app.route ( ' /parts/alter/<partID> ' , methods = [ ' POST ' ] )
# @requires_auth
def alter ( partID ) :
partID = int ( partID )
s = ' '
r = { }
if partID < 0 :
# New entry
s = ' insert into parts (partno, description, datasheet, location_id) '
s + = ' values (:partno, :description, :datasheet, :location_id) returning id; '
s = text ( s )
if len ( request . files ) != 0 :
datasheet_file = request . files [ ' datasheet-file ' ]
datasheet_filename = secure_filename ( datasheet_file . filename )
i = 1
while os . path . isfile ( ' srv/datasheets/ ' + datasheet_filename ) :
datasheet_filename = datasheet_filename [ : - 4 ] + str ( i ) + ' .pdf '
i + = 1
datasheet_file . save ( ' /srv/datasheets/ ' + datasheet_filename )
datasheet_filename = ' http://elab.kth.se/parts/getfile/ ' + datasheet_filename
elif request . form . has_key ( ' datasheet-url ' ) :
datasheet_filename = request . form [ ' datasheet-url ' ]
else :
datasheet_filename = None
r = db_engine . execute ( s , partno = request . form [ ' partno ' ] ,
description = request . form [ ' description ' ] ,
datasheet = datasheet_filename ,
location_id = request . form [ ' location_id ' ] )
else :
# Modify entry
r = db_engine . execute ( text ( ' select * from parts where id=:id; ' ) , id = partID )
l = [ ]
for row in r :
l . append ( dict ( row ) )
r . close ( )
s = ' update parts '
s + = ' set partno=:partno, description=:description, datasheet=:datasheet, location_id=:location_id '
if len ( request . files ) != 0 :
datasheet_file = request . files [ ' datasheet-file ' ]
datasheet_filename = secure_filename ( datasheet_file . filename )
i = 1
while os . path . isfile ( ' srv/datasheets/ ' + datasheet_filename ) :
datasheet_filename = datasheet_filename [ : - 4 ] + str ( i ) + ' .pdf '
i + = 1
datasheet_file . save ( ' /srv/datasheets/ ' + datasheet_filename )
datasheet_filename = ' http://elab.kth.se/parts/getfile/ ' + datasheet_filename
if l [ 0 ] [ ' datasheet ' ] != None :
os . remove ( ' /srv/datasheets/ ' + l [ 0 ] [ ' datasheet ' ] )
elif request . form . has_key ( ' datasheet-url ' ) :
datasheet_filename = request . form [ ' datasheet-url ' ]
else :
datasheet_filename = l [ 0 ] [ ' datasheet ' ]
s + = ' where id=:id returning id; '
s = text ( s )
r = db_engine . execute ( s , partno = request . form [ ' partno ' ] ,
description = request . form [ ' description ' ] ,
datasheet = datasheet_filename ,
location_id = request . form [ ' location_id ' ] ,
id = partID )
new_id = r . fetchone ( ) [ 0 ]
r . close ( )
return ' { " status " : " ok " , " part_id " : ' + str ( new_id ) + ' } '
@app.route ( ' /parts/delete/<partID> ' )
# @requires_auth
def delete ( partID ) :
if int ( partID ) < 0 :
abort ( 400 )
s = text ( ' delete from parts where id=:id; ' )
r = db_engine . execute ( s , id = partID )
return ' { " status " : " ok " } '
@app.route ( ' /parts/deleteLocation/<locationID> ' )
# @requires_auth
def deleteLocation ( locationID ) :
if int ( locationID ) < 0 :
abort ( 400 )
s = text ( ' delete from locations where id=:id; ' )
r = db_engine . execute ( s , id = locationID )
return ' { " status " : " ok " } '
def connect ( user , password , db , host = ' localhost ' , port = 5432 ) :
''' Returns a connection and a metadata object '''
# We connect with the help of the PostgreSQL URL
url = ' postgresql:// {} : {} @ {} : {} / {} '
url = url . format ( user , password , host , port , db )
# The return value of create_engine() is our connection object
con = sqlalchemy . create_engine ( url , client_encoding = ' utf8 ' )
# We then bind the connection to MetaData()
meta = sqlalchemy . MetaData ( bind = con , reflect = True )
return con , meta
if __name__ == ' __main__ ' :
with open ( ' admin.json ' ) as f :
postgres_credentials = json . load ( f )
db_engine , db_metadata = connect ( postgres_credentials [ ' username ' ] , postgres_credentials [ ' password ' ] , ' parts_v2 ' )
parts = sqlalchemy . Table ( ' parts ' , db_metadata )
# Example query
''' s = select([parts]).where(parts.c.notes != ' ' )
for row in db_engine . execute ( s ) :
print row '''
app . run ( ' 0.0.0.0 ' )