screwdriver/Postgres_API.py
2025-02-06 16:10:58 +08:00

349 lines
12 KiB
Python

import time
import psycopg2
import json
# MODEL
class aoi_allresults_detail:
def __init__(self):
pass
def all_list(self):
awm_key = "awm_key"
sn = "sn"
aoi_start = "aoi_start"
aoi_end = "aoi_end"
aoi_cv_lead_time = "aoi_cv_lead_time"
aoi_op_lead_time = "aoi_op_lead_time"
pass_or_failure = "pass_or_failure"
isfirstchk = "isfirstchk"
list = [awm_key, sn, aoi_start, aoi_end, aoi_cv_lead_time, aoi_op_lead_time, pass_or_failure, isfirstchk]
return list
def key_list(self):
awm_key = "awm_key"
sn = "sn"
list = [awm_key, sn]
return list
def key_list_onlyawm_key(self):
awm_key = "awm_key"
list = [awm_key]
return list
# Control
class SQL_aoi_allresults_detail(aoi_allresults_detail):
def __init__(self, IP, port, user, password, database):
super().__init__()
# 資料庫連線設定
self.IP = IP
self.port = port
self.user = user
self.password = password
self.database = database
self.db = psycopg2.connect(database=str(self.database), user=str(self.user), password=str(self.password),
host=str(self.IP), port=int(self.port))
# 建立操作游標
self.cursor = self.db.cursor()
# 獲取列名
def get_column(self, table_name):
sql = f"SELECT column_name FROM information_schema.columns WHERE table_name = '{table_name}'"
self.cursor.execute(sql)
self.column_data = self.cursor.fetchall()
col_list = [col[0] for col in self.column_data]
print(f'self.column_data = {self.column_data}')
# 将元组列表转换为字典
data_dict = {item: None for item in col_list}
# 将字典转换为 JSON 字符串
column_json = json.dumps(data_dict)
# 将 JSON 字符串转换为字典
column_json = json.loads(column_json)
# 打印 JSON 字符串
return (column_json)
def get_barcode_aoi_recipe_master(self, part_no):
sql = f"SELECT * FROM aoi_recipe_master vri WHERE vri.part_no = '{part_no}'"
# sql = f"SELECT * FROM aoi_recipe_master WHERE vri.awm_key = '{awm_key}'"
self.cursor.execute(sql)
all_data = self.cursor.fetchall()
return all_data[0]
def get_barcode(self, awm_key):
sql = f"SELECT * FROM aoi_op_master vri WHERE vri.awm_key = '{awm_key}'"
# sql = f"SELECT * FROM aoi_recipe_master WHERE vri.awm_key = '{awm_key}'"
self.cursor.execute(sql)
all_data = self.cursor.fetchall()
return all_data[0]
# 獲取某表單全部資料
def get_table_all_data(self, table_name):
# col_json = self.get_column(table_name)
col_list = self.all_list()
sql = f'select * from {str(table_name)}'
self.cursor.execute(sql)
all_data = self.cursor.fetchall()
# print(f'all_data = {all_data}')
all_data_json = []
for data in all_data:
col_json = {} # 創建新的字典以儲存每筆資料
# print(f'data = {data}')
for i in range(0, len(data)):
col_json[col_list[i]] = data[i]
# print(f'col_json = {col_json}')
all_data_json.append(col_json)
return (sorted(all_data_json, key=lambda x: x['aoi_start']))
# 獲取某表單指定資料
def get_table_one_data(self, table_name, key):
col_json = self.get_column(table_name)
col_list = self.key_list()
sql = f'select * from {str(table_name)} where '
for i in range(0, len(col_list)):
sql = sql + f"{col_list[i]} = '{key[col_list[i]]}'"
if i < len(col_list) - 1:
sql = sql + ' and '
# print(f'SQL = {sql}')
self.cursor.execute(sql)
data = self.cursor.fetchall()[0]
col_list = self.all_list()
for i in range(0, len(data)):
col_json[col_list[i]] = data[i]
return col_json
def get_table_one_data_onlyawm_key(self, table_name, key):
col_json = self.get_column(table_name)
col_list = self.key_list_onlyawm_key()
sql = f'select * from {str(table_name)} where '
for i in range(0, len(col_list)):
sql = sql + f"{col_list[i]} = '{key[col_list[i]]}'"
if i < len(col_list) - 1:
sql = sql + ' and '
# print(f'SQL = {sql}')
self.cursor.execute(sql)
all_data = self.cursor.fetchall()
col_list = self.all_list()
all_data_json = []
for data in all_data:
col_json = {} # 創建新的字典以儲存每筆資料
# print(f'data = {data}')
for i in range(0, len(data)):
col_json[col_list[i]] = data[i]
# print(f'col_json = {col_json}')
all_data_json.append(col_json)
return (all_data_json)
# 新增資料
def post_table_one_data(self, table_name, data):
col_list = self.all_list()
sql = f'INSERT INTO {table_name} ' + '('
for i in range(0, len(col_list)):
sql = sql + f'{col_list[i]}'
if i < len(col_list) - 1:
sql = sql + ","
else:
sql = sql + ") Values ("
for i in range(0, len(col_list)):
if (type(data[col_list[i]])) == str:
sql = sql + f"'{data[col_list[i]]}'"
else:
sql = sql + f'{data[col_list[i]]}'
if i < len(col_list) - 1:
sql = sql + ','
else:
sql = sql + ')'
print(sql)
try:
self.cursor.execute(sql)
except psycopg2.ProgrammingError as exc:
print(exc)
self.db.rollback()
self.db = psycopg2.connect(database=str(self.database), user=str(self.user), password=str(self.password),
host=str(self.IP), port=int(self.port))
self.cursor = self.db.cursor()
self.cursor.execute(sql)
self.db.commit()
# MODEL
class aoi_ng_detail:
def __init__(self):
pass
def all_list(self):
awm_key = "awm_key"
sn = "sn"
ng1 = "ng1"
ng2 = "ng2"
ng3 = "ng3"
manulChk = "\"manulChk\""
list = [awm_key, sn, ng1, ng2, ng3, manulChk]
return list
def key_list(self):
awm_key = "awm_key"
sn = "sn"
list = [awm_key, sn]
return list
# Control
class SQL_aoi_ng_detail(aoi_ng_detail):
def __init__(self, IP, port, user, password, database):
super().__init__()
# 資料庫連線設定
self.IP = IP
self.port = port
self.user = user
self.password = password
self.database = database
self.db = psycopg2.connect(database=str(self.database), user=str(self.user), password=str(self.password),
host=str(self.IP), port=int(self.port))
# 建立操作游標
self.cursor = self.db.cursor()
# 獲取列名
def get_column(self, table_name):
sql = f"SELECT column_name FROM information_schema.columns WHERE table_name = '{table_name}'"
self.cursor.execute(sql)
self.column_data = self.cursor.fetchall()
# 打印列名
# print(self.column_data)
# 将元组列表转换为字典
data_dict = {item[0]: None for item in self.column_data}
# 将字典转换为 JSON 字符串
column_json = json.dumps(data_dict)
# 将 JSON 字符串转换为字典
column_json = json.loads(column_json)
# 打印 JSON 字符串
return (column_json)
# 獲取某表單全部資料
def get_table_all_data(self, table_name):
# col_json = self.get_column(table_name)
col_list = self.all_list()
sql = f'select * from {str(table_name)}'
self.cursor.execute(sql)
all_data = self.cursor.fetchall()
all_data_json = []
for data in all_data:
col_json = {}
for i in range(0, len(data)):
col_json[col_list[i]] = data[i]
all_data_json.append(col_json)
return (all_data_json)
# 獲取某表單指定資料
def get_table_one_data(self, table_name, key):
col_json = self.get_column(table_name)
col_list = self.key_list()
sql = f'select * from {str(table_name)} where '
for i in range(0, len(col_list)):
sql = sql + f"{col_list[i]} = '{key[col_list[i]]}'"
if i < len(col_list) - 1:
sql = sql + " and "
self.cursor.execute(sql)
data = self.cursor.fetchall()[0]
col_list = self.all_list()
for i in range(0, len(data)):
col_json[col_list[i]] = data[i]
return col_json
# 新增資料
def post_table_one_data(self, table_name, data):
col_list = self.all_list()
sql = f'INSERT INTO {table_name} ' + "("
for i in range(0, len(col_list)):
sql = sql + f'{col_list[i]}'
if i < len(col_list) - 1:
sql = sql + ","
else:
sql = sql + ") Values ("
print(sql)
for i in range(0, len(col_list)):
if (type(data[col_list[i]])) == str:
sql = sql + f"'{data[col_list[i]]}'"
else:
sql = sql + f'{data[col_list[i]]}'
if i < len(col_list) - 1:
sql = sql + ","
else:
sql = sql + ")"
print(sql)
try:
self.cursor.execute(sql)
except:
# self.db.rollback()
self.db = psycopg2.connect(database=str(self.database), user=str(self.user), password=str(self.password),
host=str(self.IP), port=int(self.port))
self.cursor = self.db.cursor()
self.cursor.execute(sql)
self.db.commit()
print(sql)
def delete_table_one_data(self, table_name, awm_key, sn):
sql = f'DELETE FROM public.{table_name} WHERE awm_key=\'{awm_key}\' AND sn=\'{sn}\';'
self.cursor.execute(sql)
self.db.commit()
# print(sql)
def AOI_all():
AOI_detail_SQL = SQL_aoi_allresults_detail(IP="140.125.20.181", port="5432", user="vendor_aoi",
password="@01t;3g;l", database="AOI")
# 調用所有資料
all_data = AOI_detail_SQL.get_table_all_data(table_name="aoi_allresults_detail")
print(all_data)
# 調用單筆資料
KEY = {
"awm_key": 'M11-2303001-0050',
"sn": "1"
}
one_data = AOI_detail_SQL.get_table_one_data(table_name="aoi_allresults_detail", key=KEY)
print(one_data)
# 新增資料
data = {
"awm_key": "M11-2303001-0051",
"sn": "3",
"aoi_start": "2023-09-08 20:09:13.000",
"aoi_end": "2023-09-08 20:09:13.000",
"aoi_cv_lead_time": 2.2,
"aoi_op_lead_time": 2,
"pass_or_failure": True,
}
AOI_detail_SQL.post_table_one_data(table_name="aoi_allresults_detail", data=data)
def AOI_ng():
AOI_NG_detail_SQL = SQL_aoi_ng_detail(IP="140.125.20.183", port="5432", user="postgres", password="EL404el404",
database="postgres")
# 調用所有資料
all_data = AOI_NG_detail_SQL.get_table_all_data(table_name="aoi_ng_detail")
print(all_data)
# 調用單筆資料
KEY = {
"awm_key": 'M11-2303001-0050',
"sn": "2"
}
one_data = AOI_NG_detail_SQL.get_table_one_data(table_name="aoi_ng_detail", key=KEY)
print(one_data)
# 新增資料
data = {
"awm_key": "M11-2303001-0051",
"sn": "3",
"ng1": "1,2,3",
"ng2": "",
"ng3": ""
}
# AOI_NG_detail_SQL.post_table_one_data(table_name="aoi_ng_detail", data=data)
# AOI_all()
# AOI_ng()