Zend framework 2 / Doctrine 2 / 批量操作和事件触发
Zend framework 2 / Doctrine 2 / Bulk operations and events triggering
对于一个巨大的项目,有很多实体,我写了一个 save()
通用方法。
此方法存储在抽象服务中,并在所有项目中用于保存实体状态。
AbstractService::save() 看起来像这样:
public function save($entity)
{
$transactionStarted = $this->beginTransaction();
try
{
$action = $entity->getId() ? self::UPDATE : self::CREATION;
$this->getEventManager()->trigger('save.pre', $entity, ['action' => $action]);
$this->getEntityManager()->persist($entity);
$this->getEntityManager()->flush();
$this->getEventManager()->trigger('save.post', $entity, ['action' => $action]);
if ($transactionStarted)
{
$this->commitTransaction();
}
} catch (\Exception $e)
{
if ($transactionStarted)
{
$this->rollbackTransaction();
}
throw new Exception('Unable to save entity', $e);
}
return true;
}
public function beginTransaction()
{
if (!$this->getEntityManager()->getConnection()->isTransactionActive())
{
$this->getEntityManager()->getConnection()->beginTransaction();
return true;
}
return false;
}
public function commitTransaction()
{
$this->getEntityManager()->getConnection()->commit();
return $this;
}
public function rollbackTransaction()
{
$this->getEntityManager()->getConnection()->rollBack();
return $this;
}
在我的例子中,在调用 Member
服务(扩展 AbstractService
)时插入成员(新 Member
实体)时,会发送(例如)电子邮件save.post
事件。
或者也可以继续与调用保存方法的另一个服务相关的另一个操作。
"child"MemberService::save() 方法的示例
MemberService
public function save(Member $member)
{
// some stuff, e.g set a property
$member->setFirstName('John');
return parent::save($member);
}
触发事件示例
$sharedEventManager->attach(MemberService::class, 'save.post', [$this, 'onMembersCreation']);
public function onMembersCreation(EventInterface $event)
{
// send an email
// anything else ... update another entity ... (call AnotherService::save() too)
}
这对于简单的保存过程来说非常棒。
但现在,我想大量导入大量成员,包括创建、更新……为了实现这一点,我阅读了与批量导入相关的 Doctrine 文档。 Doc here
但是如何正确更新我的代码以处理 "bulk saving" 和 "single saving"?并保持交易安全和事件?
基本上我建议你实现 Doctrine\Common\Collections\Collection 接口,也许扩展 ArrayCollection,并创建一个方法保存,它将执行文档告诉你的操作。
<?php
class MyDirtyCollection extends \Doctrine\Common\Collections\ArrayCollection {
public function __construct(AbstractService $abstractService)
{
$this->service = $abstractService;
}
public function save()
{
foreach ($this as $entity) {
$this->service->save($entity);
}
}
}
class MyCollection extends \Doctrine\Common\Collections\ArrayCollection {
public $bulkSize = 500;
protected $eventManager;
protected $entityManager;
public function __construct(EntityManager $entityManager, EventManager $eventManager)
{
$this->entityManager = $entityManager;
$this->eventManager = $eventManager;
}
public function getEventManager()
{
return $this->eventManager;
}
public function getEntityManager()
{
return $this->entityManager;
}
public function setBulkSize(int $bulkSize)
{
$this->bulkSize = $bulkSize;
}
public function save()
{
$transactionStarted = $this->getEntityManager()->getConnection()->beginTransaction();
try {
foreach ($this as $entity) {
$action = $entity->getId() ? self::UPDATE : self::CREATION;
$this->getEventManager()->trigger('save.pre', $entity, ['action' => $action]);
}
$i = 0;
foreach ($this as $entity) {
$i++;
$this->getEntityManager()->persist($entity);
if (($i % $this->bulkSize) === 0) {
$this->getEntityManager()->flush();
$this->getEntityManager()->clear();
}
}
$this->getEntityManager()->flush();
$this->getEntityManager()->clear();
foreach ($this as $entity) {
$action = $entity->getId() ? self::UPDATE : self::CREATION;
$this->getEventManager()->trigger('save.post', $entity, ['action' => $action]);
}
if ($transactionStarted) {
$this->getEntityManager()->getConnection()->commitTransaction();
}
} catch (Exception $e) {
$this->getEntityManager()->rollbackTransaction();
}
}
}
类似的东西 ;)
当你获取你的数据时,你会滋润你的集合,然后你处理你的实体,最后调用 $collection->save();
编辑:添加插入 class 和下面的用例:
这里的性能会很低,但仍然比逐个提交要好。然而,如果您正在寻找 hgih 性能,您应该考虑使用 Doctrine DBAL 而不是 ORM。在这里,我与您分享我的 DBAL class for bulk Insert :
<?php
namespace JTH\Doctrine\DBAL;
use Doctrine\DBAL\Query\QueryBuilder;
use Exception;
use InvalidArgumentException;
use Traversable;
use UnderflowException;
class Insert extends QueryBuilder
{
const CALLBACK_FAILURE_SKIP = 0;
const CALLBACK_FAILURE_BREAK = 1;
protected $callbackFailureStrategy = self::CALLBACK_FAILURE_BREAK;
public static $defaultBulkSize = 500;
public $ignore = false;
public $onDuplicate = null;
public function values(array $values)
{
$this->resetQueryPart('values');
$this->addValues($values);
}
public function addValues(array $values)
{
$this->add('values', $values, true);
}
public function setCallbackFailureStrategy($strategy)
{
if ($strategy == static::CALLBACK_FAILURE_BREAK) {
$this->callbackFailureStrategy = static::CALLBACK_FAILURE_BREAK;
} elseif ($strategy == static::CALLBACK_FAILURE_SKIP) {
$this->callbackFailureStrategy = static::CALLBACK_FAILURE_SKIP;
} else {
$class = self::class;
throw new InvalidArgumentException(
"Invalid failure behaviour. See $class::CALLBACK_FAILURE_SKIP and $class::CALLBACK_FAILURE_BREAK"
);
}
}
public function getCallbackFailureStrategy()
{
return $this->callbackFailureStrategy;
}
public function execute()
{
return $this->getConnection()->executeUpdate(
$this->getSQLForInsert(),
$this->getParameters(),
$this->getParameterTypes()
);
}
/**
* Converts this instance into an INSERT string in SQL.
* @return string
* @throws \Exception
*/
private function getSQLForInsert()
{
$count = sizeof($this->getQueryPart('values'));
if ($count == 0) {
throw new UnderflowException("No values ready for INSERT");
}
$values = current($this->getQueryPart('values'));
$ignore = $this->ignore ? 'IGNORE' : '' ;
$sql = "INSERT $ignore INTO " . $this->getQueryPart('from')['table'] .
' (' . implode(', ', array_keys($values)) . ')' . ' VALUES ';
foreach ($this->getQueryPart('values') as $values) {
$sql .= '(' ;
foreach ($values as $value) {
if (is_array($value)) {
if ($value['raw']) {
$sql .= $value['value'] . ',';
} else {
$sql .= $this->expr()->literal($value['value'], $value['type']) . ',';
}
} else {
$sql .= $this->expr()->literal($value) . ',';
}
}
$sql = substr($sql, 0, -1);
$sql .= '),';
}
$sql = substr($sql, 0, -1);
if (!is_null($this->onDuplicate)) {
$sql .= ' ON DUPLICATE KEY UPDATE ' . $this->onDuplicate . ' ';
}
return $sql;
}
/**
* @param $loopable array | Traversable An array or object to loop over
* @param $callable Callable A callable that will be called before actually insert the row.
* two parameters will be passed :
* - the key of the current row
* - the row values (Array)
* An array of rows to insert must be returned
* @param $bulkSize int How many rows will be inserted at once
* @param bool $transactionnal
* @throws \Doctrine\DBAL\ConnectionException
* @throws \Exception
*/
public function bulk($loopable, callable $callable, $bulkSize = null, $transactionnal = true)
{
if (!is_array($loopable) and !($loopable instanceof Traversable)) {
throw new InvalidArgumentException("$loppable must be either an array or a traversable object");
}
$bulkSize = $bulkSize ?? static::$defaultBulkSize;
$this->getConnection()->getConfiguration()->setSQLLogger(null); // Avoid MonoLog memory overload
if ($transactionnal) {
$this->getConnection()->beginTransaction();
}
$this->resetQueryPart('values');
foreach ($loopable as $key => $values) {
try {
$callbackedValues = $callable($key, $values);
if (sizeof($callbackedValues) > 0) {
foreach ($callbackedValues as $callbackedValuesRow) {
$this->addValues($callbackedValuesRow);
}
}
} catch (Exception $e) {
/*
* If a callback exception must break the transaction, then throw the exception to the call stack
* Else, skip the row insertion
*/
if ($this->callbackFailureStrategy == static::CALLBACK_FAILURE_BREAK) {
throw $e;
} else {
continue;
}
}
$count = count($this->getQueryPart('values'));
if ($count >= $bulkSize) {
$this->execute();
$this->resetQueryPart('values');
}
}
$count = count($this->getQueryPart('values'));
if ($count > 0) {
$this->execute();
}
$this->resetQueryPart('values');
if ($transactionnal) {
$this->getConnection()->commit();
}
}
/**
* @return boolean
*/
public function isIgnore()
{
return $this->ignore;
}
/**
* @param boolean $ignore
*/
public function setIgnore(bool $ignore)
{
$this->ignore = $ignore;
}
/**
* @return null|string
*/
public function getOnDuplicate() : string
{
return $this->onDuplicate;
}
/**
* @param null $onDuplicate
*/
public function setOnDuplicate($onDuplicate)
{
$this->onDuplicate = $onDuplicate;
$this->ignore = false;
}
}
用例:
try {
$i = new Insert($this->getDoctrine()->getConnection('myDB'));
$i->insert('myTable');
$i->setOnDuplicate('col1 = VALUES(col1), updated_last = NOW()');
$i->setCallbackFailureStrategy(Insert::CALLBACK_FAILURE_BREAK);
$i->bulk($myArrayOfRows, function ($key, $row) {
// Some pre-insert processing
$rowset[] = $row;
return $rowset;
}, 500, true);
$this->addFlash('success', 'Yay !');
} catch (DBALException $e) {
$this->addFlash('error', 'Damn, error : ' . $e->getMessage());
}
最后,我使用了merge
教义方法,似乎效果很好。
我写了一个单独的 AbstractService::saveBulk()
方法来保存大量 Member
实体,例如:
/**
* @param ArrayCollection $entities
*
* @return bool
* @throws Exception
*/
public function saveBulk(ArrayCollection $entities)
{
$batchSize = 100;
$i = 0;
foreach ($entities as $entity)
{
$transactionStarted = $this->beginTransaction();
try
{
$action = $entity->getId() ? self::UPDATE : self::CREATION;
$this->getEventManager()->trigger('save.pre', $entity, ['action' => $action]);
$entity = $this->getEntityManager()->merge($entity);
$this->getEntityManager()->persist($entity);
$this->getEntityManager()->flush();
$this->getEventManager()->trigger('save.post', $entity, ['action' => $action]);
if (($i % $batchSize) === 0)
{
$this->getEntityManager()->clear();
}
if ($transactionStarted)
{
$this->commitTransaction();
}
} catch (\Exception $e)
{
if ($transactionStarted)
{
$this->rollbackTransaction();
}
throw new Exception(Exception::UNEXPECTED_ERROR, 'Unable to save entity', $e);
}
}
$this->getEntityManager()->clear();
return true;
}
与 doctrine2 文档相反,我只为每个批次调用 clear()
而不是 flush() + clear()
,因为对于调用的某些事件,我需要知道实体是否具有数据库标识符。
@JesusTheHun 感谢您的评论对我有很大帮助。
对于一个巨大的项目,有很多实体,我写了一个 save()
通用方法。
此方法存储在抽象服务中,并在所有项目中用于保存实体状态。
AbstractService::save() 看起来像这样:
public function save($entity)
{
$transactionStarted = $this->beginTransaction();
try
{
$action = $entity->getId() ? self::UPDATE : self::CREATION;
$this->getEventManager()->trigger('save.pre', $entity, ['action' => $action]);
$this->getEntityManager()->persist($entity);
$this->getEntityManager()->flush();
$this->getEventManager()->trigger('save.post', $entity, ['action' => $action]);
if ($transactionStarted)
{
$this->commitTransaction();
}
} catch (\Exception $e)
{
if ($transactionStarted)
{
$this->rollbackTransaction();
}
throw new Exception('Unable to save entity', $e);
}
return true;
}
public function beginTransaction()
{
if (!$this->getEntityManager()->getConnection()->isTransactionActive())
{
$this->getEntityManager()->getConnection()->beginTransaction();
return true;
}
return false;
}
public function commitTransaction()
{
$this->getEntityManager()->getConnection()->commit();
return $this;
}
public function rollbackTransaction()
{
$this->getEntityManager()->getConnection()->rollBack();
return $this;
}
在我的例子中,在调用 Member
服务(扩展 AbstractService
)时插入成员(新 Member
实体)时,会发送(例如)电子邮件save.post
事件。
或者也可以继续与调用保存方法的另一个服务相关的另一个操作。
"child"MemberService::save() 方法的示例
MemberService
public function save(Member $member)
{
// some stuff, e.g set a property
$member->setFirstName('John');
return parent::save($member);
}
触发事件示例
$sharedEventManager->attach(MemberService::class, 'save.post', [$this, 'onMembersCreation']);
public function onMembersCreation(EventInterface $event)
{
// send an email
// anything else ... update another entity ... (call AnotherService::save() too)
}
这对于简单的保存过程来说非常棒。
但现在,我想大量导入大量成员,包括创建、更新……为了实现这一点,我阅读了与批量导入相关的 Doctrine 文档。 Doc here
但是如何正确更新我的代码以处理 "bulk saving" 和 "single saving"?并保持交易安全和事件?
基本上我建议你实现 Doctrine\Common\Collections\Collection 接口,也许扩展 ArrayCollection,并创建一个方法保存,它将执行文档告诉你的操作。
<?php
class MyDirtyCollection extends \Doctrine\Common\Collections\ArrayCollection {
public function __construct(AbstractService $abstractService)
{
$this->service = $abstractService;
}
public function save()
{
foreach ($this as $entity) {
$this->service->save($entity);
}
}
}
class MyCollection extends \Doctrine\Common\Collections\ArrayCollection {
public $bulkSize = 500;
protected $eventManager;
protected $entityManager;
public function __construct(EntityManager $entityManager, EventManager $eventManager)
{
$this->entityManager = $entityManager;
$this->eventManager = $eventManager;
}
public function getEventManager()
{
return $this->eventManager;
}
public function getEntityManager()
{
return $this->entityManager;
}
public function setBulkSize(int $bulkSize)
{
$this->bulkSize = $bulkSize;
}
public function save()
{
$transactionStarted = $this->getEntityManager()->getConnection()->beginTransaction();
try {
foreach ($this as $entity) {
$action = $entity->getId() ? self::UPDATE : self::CREATION;
$this->getEventManager()->trigger('save.pre', $entity, ['action' => $action]);
}
$i = 0;
foreach ($this as $entity) {
$i++;
$this->getEntityManager()->persist($entity);
if (($i % $this->bulkSize) === 0) {
$this->getEntityManager()->flush();
$this->getEntityManager()->clear();
}
}
$this->getEntityManager()->flush();
$this->getEntityManager()->clear();
foreach ($this as $entity) {
$action = $entity->getId() ? self::UPDATE : self::CREATION;
$this->getEventManager()->trigger('save.post', $entity, ['action' => $action]);
}
if ($transactionStarted) {
$this->getEntityManager()->getConnection()->commitTransaction();
}
} catch (Exception $e) {
$this->getEntityManager()->rollbackTransaction();
}
}
}
类似的东西 ;)
当你获取你的数据时,你会滋润你的集合,然后你处理你的实体,最后调用 $collection->save();
编辑:添加插入 class 和下面的用例:
这里的性能会很低,但仍然比逐个提交要好。然而,如果您正在寻找 hgih 性能,您应该考虑使用 Doctrine DBAL 而不是 ORM。在这里,我与您分享我的 DBAL class for bulk Insert :
<?php
namespace JTH\Doctrine\DBAL;
use Doctrine\DBAL\Query\QueryBuilder;
use Exception;
use InvalidArgumentException;
use Traversable;
use UnderflowException;
class Insert extends QueryBuilder
{
const CALLBACK_FAILURE_SKIP = 0;
const CALLBACK_FAILURE_BREAK = 1;
protected $callbackFailureStrategy = self::CALLBACK_FAILURE_BREAK;
public static $defaultBulkSize = 500;
public $ignore = false;
public $onDuplicate = null;
public function values(array $values)
{
$this->resetQueryPart('values');
$this->addValues($values);
}
public function addValues(array $values)
{
$this->add('values', $values, true);
}
public function setCallbackFailureStrategy($strategy)
{
if ($strategy == static::CALLBACK_FAILURE_BREAK) {
$this->callbackFailureStrategy = static::CALLBACK_FAILURE_BREAK;
} elseif ($strategy == static::CALLBACK_FAILURE_SKIP) {
$this->callbackFailureStrategy = static::CALLBACK_FAILURE_SKIP;
} else {
$class = self::class;
throw new InvalidArgumentException(
"Invalid failure behaviour. See $class::CALLBACK_FAILURE_SKIP and $class::CALLBACK_FAILURE_BREAK"
);
}
}
public function getCallbackFailureStrategy()
{
return $this->callbackFailureStrategy;
}
public function execute()
{
return $this->getConnection()->executeUpdate(
$this->getSQLForInsert(),
$this->getParameters(),
$this->getParameterTypes()
);
}
/**
* Converts this instance into an INSERT string in SQL.
* @return string
* @throws \Exception
*/
private function getSQLForInsert()
{
$count = sizeof($this->getQueryPart('values'));
if ($count == 0) {
throw new UnderflowException("No values ready for INSERT");
}
$values = current($this->getQueryPart('values'));
$ignore = $this->ignore ? 'IGNORE' : '' ;
$sql = "INSERT $ignore INTO " . $this->getQueryPart('from')['table'] .
' (' . implode(', ', array_keys($values)) . ')' . ' VALUES ';
foreach ($this->getQueryPart('values') as $values) {
$sql .= '(' ;
foreach ($values as $value) {
if (is_array($value)) {
if ($value['raw']) {
$sql .= $value['value'] . ',';
} else {
$sql .= $this->expr()->literal($value['value'], $value['type']) . ',';
}
} else {
$sql .= $this->expr()->literal($value) . ',';
}
}
$sql = substr($sql, 0, -1);
$sql .= '),';
}
$sql = substr($sql, 0, -1);
if (!is_null($this->onDuplicate)) {
$sql .= ' ON DUPLICATE KEY UPDATE ' . $this->onDuplicate . ' ';
}
return $sql;
}
/**
* @param $loopable array | Traversable An array or object to loop over
* @param $callable Callable A callable that will be called before actually insert the row.
* two parameters will be passed :
* - the key of the current row
* - the row values (Array)
* An array of rows to insert must be returned
* @param $bulkSize int How many rows will be inserted at once
* @param bool $transactionnal
* @throws \Doctrine\DBAL\ConnectionException
* @throws \Exception
*/
public function bulk($loopable, callable $callable, $bulkSize = null, $transactionnal = true)
{
if (!is_array($loopable) and !($loopable instanceof Traversable)) {
throw new InvalidArgumentException("$loppable must be either an array or a traversable object");
}
$bulkSize = $bulkSize ?? static::$defaultBulkSize;
$this->getConnection()->getConfiguration()->setSQLLogger(null); // Avoid MonoLog memory overload
if ($transactionnal) {
$this->getConnection()->beginTransaction();
}
$this->resetQueryPart('values');
foreach ($loopable as $key => $values) {
try {
$callbackedValues = $callable($key, $values);
if (sizeof($callbackedValues) > 0) {
foreach ($callbackedValues as $callbackedValuesRow) {
$this->addValues($callbackedValuesRow);
}
}
} catch (Exception $e) {
/*
* If a callback exception must break the transaction, then throw the exception to the call stack
* Else, skip the row insertion
*/
if ($this->callbackFailureStrategy == static::CALLBACK_FAILURE_BREAK) {
throw $e;
} else {
continue;
}
}
$count = count($this->getQueryPart('values'));
if ($count >= $bulkSize) {
$this->execute();
$this->resetQueryPart('values');
}
}
$count = count($this->getQueryPart('values'));
if ($count > 0) {
$this->execute();
}
$this->resetQueryPart('values');
if ($transactionnal) {
$this->getConnection()->commit();
}
}
/**
* @return boolean
*/
public function isIgnore()
{
return $this->ignore;
}
/**
* @param boolean $ignore
*/
public function setIgnore(bool $ignore)
{
$this->ignore = $ignore;
}
/**
* @return null|string
*/
public function getOnDuplicate() : string
{
return $this->onDuplicate;
}
/**
* @param null $onDuplicate
*/
public function setOnDuplicate($onDuplicate)
{
$this->onDuplicate = $onDuplicate;
$this->ignore = false;
}
}
用例:
try {
$i = new Insert($this->getDoctrine()->getConnection('myDB'));
$i->insert('myTable');
$i->setOnDuplicate('col1 = VALUES(col1), updated_last = NOW()');
$i->setCallbackFailureStrategy(Insert::CALLBACK_FAILURE_BREAK);
$i->bulk($myArrayOfRows, function ($key, $row) {
// Some pre-insert processing
$rowset[] = $row;
return $rowset;
}, 500, true);
$this->addFlash('success', 'Yay !');
} catch (DBALException $e) {
$this->addFlash('error', 'Damn, error : ' . $e->getMessage());
}
最后,我使用了merge
教义方法,似乎效果很好。
我写了一个单独的 AbstractService::saveBulk()
方法来保存大量 Member
实体,例如:
/**
* @param ArrayCollection $entities
*
* @return bool
* @throws Exception
*/
public function saveBulk(ArrayCollection $entities)
{
$batchSize = 100;
$i = 0;
foreach ($entities as $entity)
{
$transactionStarted = $this->beginTransaction();
try
{
$action = $entity->getId() ? self::UPDATE : self::CREATION;
$this->getEventManager()->trigger('save.pre', $entity, ['action' => $action]);
$entity = $this->getEntityManager()->merge($entity);
$this->getEntityManager()->persist($entity);
$this->getEntityManager()->flush();
$this->getEventManager()->trigger('save.post', $entity, ['action' => $action]);
if (($i % $batchSize) === 0)
{
$this->getEntityManager()->clear();
}
if ($transactionStarted)
{
$this->commitTransaction();
}
} catch (\Exception $e)
{
if ($transactionStarted)
{
$this->rollbackTransaction();
}
throw new Exception(Exception::UNEXPECTED_ERROR, 'Unable to save entity', $e);
}
}
$this->getEntityManager()->clear();
return true;
}
与 doctrine2 文档相反,我只为每个批次调用 clear()
而不是 flush() + clear()
,因为对于调用的某些事件,我需要知道实体是否具有数据库标识符。
@JesusTheHun 感谢您的评论对我有很大帮助。