Python 中接口和 类 之间的 UML 聚合
UML aggregation between Interface and Classes in Python
我想从我写的一个小程序创建一个 UML。到目前为止,我只为 Java 程序制作了 UML 图,而为 python 画一张图对我来说还很新鲜。
问题是,有一个作家 - “接口”,例如一个继承自abc.ABCMeta并实现抽象方法write()的class。
这个接口在两个 classes 中实现。数据库 class 和 CSVWriter Class。
构造函数,例如另一个名为 DataCollector 的 class 的 init() 方法采用实现了 Writer 接口的 Class 作为参数。然后实例 CSVWriter 或数据库将作为实例变量存储在 DataCollector 对象中。
如何在 UML 中显示这种关系? Python 确实没有接口。对我来说,它似乎只是继承自“接口”。
我尝试了 UML,并将 DataCollector 与 WriterInterface 聚合在一起。在接口和 class 之间使用聚合是否可以,还是我必须在实现接口的 Class 和 Class DataCollector 之间绘制聚合?
到目前为止我是这样画的:
UML 基于的代码:
import os
import abc
import Adafruit_DHT
import smbus
import csv
import RPi.GPIO as GPIO
import time
import datetime
import mh_z19
#import psycopg2
import config as cfg
I2C_SETTINGS = {
"DEVICE": 0x23,
"POWER_DOWN": 0x00,
"POWER_ON": 0x01,
"RESET": 0x07,
"RESOLUTION": {"ONE_TIME_HIGH_RES_MODE_1": 0x20},
"BUS": 1,
}
CSV_HEADERS = [
"timestamp",
"light",
"humidity",
"temperature",
"co2",
"occupancy",
"motion_count",
]
class SensorConnector:
def __init__(self, pin_pir, pin_dht22, i2c_settings):
self.pin_pir = pin_pir
self.pin_dht22 = pin_dht22
self.dht = Adafruit_DHT.DHT22
self.i2c_settings = i2c_settings
self.bus = smbus.SMBus(i2c_settings["BUS"])
GPIO.setmode(GPIO.BCM)
GPIO.setup(self.pin_pir, GPIO.IN)
def convert_to_number(self, data):
"Simple Function to convert 2 bytes of data into a decimal number"
result = (data[1] + (256 * data[0])) / 1.2
return result
def read_light(self, resolution):
"Read data from I2C Interface"
data = self.bus.read_i2c_block_data(self.i2c_settings["DEVICE"], resolution)
return self.convert_to_number(data)
def read_temp_hum(self):
hum, temp = Adafruit_DHT.read_retry(self.dht, self.pin_dht22)
return hum, temp
def read_sensors(self):
data = {}
data["light"] = self.read_light(
self.i2c_settings["RESOLUTION"]["ONE_TIME_HIGH_RES_MODE_1"]
)
data["humidity"], data["temperature"] = self.read_temp_hum()
data["co2"] = mh_z19.read()["co2"]
return data
class WriterInterface(metaclass=abc.ABCMeta):
@classmethod
def __subclasshook__(cls, subclass):
return hasattr(subclass, "write") and callable(subclass.write) or NotImplemented
@abc.abstractmethod
def write(self, data):
raise NotImplementedError
class CSVWriter(WriterInterface):
def __init__(self, file, headers):
file_exists = os.path.isfile(file)
self.csv_file = open(file, "a")
self.writer = csv.DictWriter(
self.csv_file, delimiter=",", lineterminator="\n", fieldnames=headers
)
if not file_exists:
self.writer.writeheader()
print("initialized csv_file")
def write(self, data):
self.writer.writerow(data)
self.csv_file.flush()
class Database(WriterInterface):
def __init__(self):
self.con = psycopg2.connect(
host=cfg.postgres["host"],
dbname=cfg.postgres["db"],
user=cfg.postgres["user"],
password=cfg.postgres["passwd"],
)
def write(self, data):
if None not in data.values():
try:
cur = self.con.cursor()
cur.execute(
"""insert into "sensordata" (created_time, temperature, humidity, \
light, occupancy, people_count) values (%s, %s, %s, %s, %s, %s);""",
(
data["timestamp"],
data["temperature"],
data["humidity"],
data["light"],
data["occupancy"],
data["motion_count"],
),
)
self.con.commit()
cur.close()
except Exception as e:
print(e)
else:
print(f"problem with data:\n{data}")
class DataCollector:
def __init__(self, s_connector, writer):
self.s_connector = s_connector
self.writer = writer
self.ts_last_motion = None
self.occupied_state = False
self.motion_count = 0
def motion_handler(self, channel):
print("motion detected")
self.ts_last_motion = datetime.datetime.now()
if self.occupied_state == False:
self.occupied_state = True
self.motion_count += 1
def collect_data(self):
GPIO.add_event_detect(
self.s_connector.pin_pir, GPIO.RISING, callback=self.motion_handler
)
while True:
now = datetime.datetime.now()
timedelta_since_last_motion = (
(now - self.ts_last_motion) if self.ts_last_motion else 0
)
if (
self.occupied_state == True
and timedelta_since_last_motion.seconds > 900
):
print("Room vacant at: ")
print(time.strftime("%H"), ":", time.strftime("%M"))
self.occupied_state = False
data = {
**self.s_connector.read_sensors(),
"occupancy": self.occupied_state,
"timestamp": datetime.datetime.now(),
"motion_count": self.motion_count,
}
print(data)
self.writer.write(data)
time.sleep(60)
if __name__ == "__main__":
s_connector = SensorConnector(pin_pir=17, pin_dht22=4, i2c_settings=I2C_SETTINGS)
csv_writer = CSVWriter('/path/to/data.csv', CSV_HEADERS)
#writer = Database()
dc = DataCollector(s_connector, csv_writer)
dc.collect_data()
it ok to use an aggregation between an interface and a class
是的,这是正确的方法
do i have to draw aggregations between the Classes that implement the interface and the Class DataCollector
不,有几个原因:
接口的目标是隐藏有效的实现,与每个实现都有关系类揭示具有所有相关结果的实现
新的类实现接口可以稍后出现,你不想每次出现都需要添加一个关系修改DataCollector到一个新的
DataCollector只有一个writers,如果你有多个关系,他们需要是排他的,这是一个很复杂的方法没有什么
我想从我写的一个小程序创建一个 UML。到目前为止,我只为 Java 程序制作了 UML 图,而为 python 画一张图对我来说还很新鲜。 问题是,有一个作家 - “接口”,例如一个继承自abc.ABCMeta并实现抽象方法write()的class。 这个接口在两个 classes 中实现。数据库 class 和 CSVWriter Class。 构造函数,例如另一个名为 DataCollector 的 class 的 init() 方法采用实现了 Writer 接口的 Class 作为参数。然后实例 CSVWriter 或数据库将作为实例变量存储在 DataCollector 对象中。
如何在 UML 中显示这种关系? Python 确实没有接口。对我来说,它似乎只是继承自“接口”。 我尝试了 UML,并将 DataCollector 与 WriterInterface 聚合在一起。在接口和 class 之间使用聚合是否可以,还是我必须在实现接口的 Class 和 Class DataCollector 之间绘制聚合?
到目前为止我是这样画的:
UML 基于的代码:
import os
import abc
import Adafruit_DHT
import smbus
import csv
import RPi.GPIO as GPIO
import time
import datetime
import mh_z19
#import psycopg2
import config as cfg
I2C_SETTINGS = {
"DEVICE": 0x23,
"POWER_DOWN": 0x00,
"POWER_ON": 0x01,
"RESET": 0x07,
"RESOLUTION": {"ONE_TIME_HIGH_RES_MODE_1": 0x20},
"BUS": 1,
}
CSV_HEADERS = [
"timestamp",
"light",
"humidity",
"temperature",
"co2",
"occupancy",
"motion_count",
]
class SensorConnector:
def __init__(self, pin_pir, pin_dht22, i2c_settings):
self.pin_pir = pin_pir
self.pin_dht22 = pin_dht22
self.dht = Adafruit_DHT.DHT22
self.i2c_settings = i2c_settings
self.bus = smbus.SMBus(i2c_settings["BUS"])
GPIO.setmode(GPIO.BCM)
GPIO.setup(self.pin_pir, GPIO.IN)
def convert_to_number(self, data):
"Simple Function to convert 2 bytes of data into a decimal number"
result = (data[1] + (256 * data[0])) / 1.2
return result
def read_light(self, resolution):
"Read data from I2C Interface"
data = self.bus.read_i2c_block_data(self.i2c_settings["DEVICE"], resolution)
return self.convert_to_number(data)
def read_temp_hum(self):
hum, temp = Adafruit_DHT.read_retry(self.dht, self.pin_dht22)
return hum, temp
def read_sensors(self):
data = {}
data["light"] = self.read_light(
self.i2c_settings["RESOLUTION"]["ONE_TIME_HIGH_RES_MODE_1"]
)
data["humidity"], data["temperature"] = self.read_temp_hum()
data["co2"] = mh_z19.read()["co2"]
return data
class WriterInterface(metaclass=abc.ABCMeta):
@classmethod
def __subclasshook__(cls, subclass):
return hasattr(subclass, "write") and callable(subclass.write) or NotImplemented
@abc.abstractmethod
def write(self, data):
raise NotImplementedError
class CSVWriter(WriterInterface):
def __init__(self, file, headers):
file_exists = os.path.isfile(file)
self.csv_file = open(file, "a")
self.writer = csv.DictWriter(
self.csv_file, delimiter=",", lineterminator="\n", fieldnames=headers
)
if not file_exists:
self.writer.writeheader()
print("initialized csv_file")
def write(self, data):
self.writer.writerow(data)
self.csv_file.flush()
class Database(WriterInterface):
def __init__(self):
self.con = psycopg2.connect(
host=cfg.postgres["host"],
dbname=cfg.postgres["db"],
user=cfg.postgres["user"],
password=cfg.postgres["passwd"],
)
def write(self, data):
if None not in data.values():
try:
cur = self.con.cursor()
cur.execute(
"""insert into "sensordata" (created_time, temperature, humidity, \
light, occupancy, people_count) values (%s, %s, %s, %s, %s, %s);""",
(
data["timestamp"],
data["temperature"],
data["humidity"],
data["light"],
data["occupancy"],
data["motion_count"],
),
)
self.con.commit()
cur.close()
except Exception as e:
print(e)
else:
print(f"problem with data:\n{data}")
class DataCollector:
def __init__(self, s_connector, writer):
self.s_connector = s_connector
self.writer = writer
self.ts_last_motion = None
self.occupied_state = False
self.motion_count = 0
def motion_handler(self, channel):
print("motion detected")
self.ts_last_motion = datetime.datetime.now()
if self.occupied_state == False:
self.occupied_state = True
self.motion_count += 1
def collect_data(self):
GPIO.add_event_detect(
self.s_connector.pin_pir, GPIO.RISING, callback=self.motion_handler
)
while True:
now = datetime.datetime.now()
timedelta_since_last_motion = (
(now - self.ts_last_motion) if self.ts_last_motion else 0
)
if (
self.occupied_state == True
and timedelta_since_last_motion.seconds > 900
):
print("Room vacant at: ")
print(time.strftime("%H"), ":", time.strftime("%M"))
self.occupied_state = False
data = {
**self.s_connector.read_sensors(),
"occupancy": self.occupied_state,
"timestamp": datetime.datetime.now(),
"motion_count": self.motion_count,
}
print(data)
self.writer.write(data)
time.sleep(60)
if __name__ == "__main__":
s_connector = SensorConnector(pin_pir=17, pin_dht22=4, i2c_settings=I2C_SETTINGS)
csv_writer = CSVWriter('/path/to/data.csv', CSV_HEADERS)
#writer = Database()
dc = DataCollector(s_connector, csv_writer)
dc.collect_data()
it ok to use an aggregation between an interface and a class
是的,这是正确的方法
do i have to draw aggregations between the Classes that implement the interface and the Class DataCollector
不,有几个原因:
接口的目标是隐藏有效的实现,与每个实现都有关系类揭示具有所有相关结果的实现
新的类实现接口可以稍后出现,你不想每次出现都需要添加一个关系修改DataCollector到一个新的
DataCollector只有一个writers,如果你有多个关系,他们需要是排他的,这是一个很复杂的方法没有什么