import logging import re import time import app import main import threading import log from typing import List import OpenOPC import _thread import pythoncom import win32com.client from win32com.client import gencache import win32timezone from win32com.client import Dispatch from win32com.client import DispatchWithEvents import pywintypes pywintypes.datetime = pywintypes.TimeType class opc_server(threading.Thread): def __init__(self, threadID, name, counter): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.counter = counter self.user_var = app.user_var self.opcServer = OpenOPC.client() self.opcaddrlist = app.opc_value.opcaddrlist self.opc_value = app.opc_value self.rdkv = self.opc_value.opcrdkv self.wrkv = self.opc_value.opcwrkv self.wrstatkv = app.opc_value.opcwrstatkv self.exit_stat = False self.log_file = log.log_file self.log_print = log.log_file self.readtag_stat = False self.group = self.opc_value.stationgroup def OnDataChange(self, TransactionID, NumItems, ClientHandles, ItemValues, Qualities, TimeStamps): i = 0 print("callback") while (i < NumItems): handle = ClientHandles[i] value = ItemValues[i] quality = Qualities[i] time = TimeStamps[i] print( 'item:{0} , value:{1} , quality:{2}, time:{3}'.format(self.opcaddrlist[handle - 1], value, quality, time)) self.rdkv[self.opcaddrlist[handle-1]] = value i = i + 1 def connect_opc(self, progID: str): self.opcServer.connect(progID, 'localhost') self.log_print.logger.info('已连接到opc - [{}]'.format(progID)) return self.opcServer def disconnect_opc(self, opcServer): self.log_print.logger.info('opc断开连接') self.opcServer.Disconnect() def asyn_write_items(self): write_list = [] for key, value in self.wrstatkv.items(): if value == True: write_tuple = (key, self.wrkv[key]) write_list.append(write_tuple) if len(write_list) > 0: self.opcServer.write(write_list) def opc_start(self): opc_conn = False wr_stat = True self.opcServer = OpenOPC.client() self.opcServer = self.connect_opc('Kepware.KEPServerEX.V6') opc_conn = True try: while self.exit_stat == False: if self.readtag_stat == False: self.readtag_stat = True count = 0 listvalue = self.opcServer.read(self.opcaddrlist, include_error=True) with main.threadlock: for tup in listvalue: if tup[2] == 'Good': self.rdkv[tup[0]] = tup[1] elif tup[2] == 'Error' or tup[2] == 'Bad': count = count + 1 if count >= len(listvalue): self.user_var.opc_conn_stat = "PLC未连接" else: self.user_var.opc_conn_stat = "PLC已连接" self.readtag_stat = False with main.threadlock: try: self.asyn_write_items() wr_stat = True except Exception as e: if wr_stat == True: wr_stat = False self.log_file.logger.error('opc写异常: {0}'.format(e), exc_info=True, stack_info=True) break time.sleep(0.01) except Exception as e: self.log_file.logger.error('opc工作异常: {0}'.format(e), exc_info=True, stack_info=True) finally: self.opcServer.close() if(opc_conn==False): self.user_var.opc_conn_stat = "OPC服务器未连接" def exit(self): self.exit_stat = True def run(self): # pythoncom.CoInitialize() while self.exit_stat == False: time.sleep(1) self.opc_start() print("opc进程结束!")