Zend framework 2 / Doctrine 2 / 批量操作和事件触发

Zend framework 2 / Doctrine 2 / Bulk operations and events triggering

对于一个巨大的项目,有很多实体,我写了一个 save() 通用方法。

此方法存储在抽象服务中,并在所有项目中用于保存实体状态。

AbstractService::save() 看起来像这样:

public function save($entity)
{
    $transactionStarted = $this->beginTransaction();

    try
    {
        $action = $entity->getId() ? self::UPDATE : self::CREATION;

        $this->getEventManager()->trigger('save.pre', $entity, ['action' => $action]);

        $this->getEntityManager()->persist($entity);
        $this->getEntityManager()->flush();

        $this->getEventManager()->trigger('save.post', $entity, ['action' => $action]);

        if ($transactionStarted)
        {
            $this->commitTransaction();
        }
    } catch (\Exception $e)
    {
        if ($transactionStarted)
        {
            $this->rollbackTransaction();
        }

        throw new Exception('Unable to save entity', $e);
    }

    return true;
}

public function beginTransaction()
{
    if (!$this->getEntityManager()->getConnection()->isTransactionActive())
    {
        $this->getEntityManager()->getConnection()->beginTransaction();

        return true;
    }

    return false;
}

public function commitTransaction()
{
    $this->getEntityManager()->getConnection()->commit();

    return $this;
}

public function rollbackTransaction()
{
    $this->getEntityManager()->getConnection()->rollBack();

    return $this;
}

在我的例子中,在调用 Member 服务(扩展 AbstractService)时插入成员(新 Member 实体)时,会发送(例如)电子邮件save.post 事件。 或者也可以继续与调用保存方法的另一个服务相关的另一个操作。

"child"MemberService::save() 方法的示例

MemberService

public function save(Member $member)
{
    // some stuff, e.g set a property
    $member->setFirstName('John');

    return parent::save($member);
}

触发事件示例

$sharedEventManager->attach(MemberService::class, 'save.post', [$this, 'onMembersCreation']);

public function onMembersCreation(EventInterface $event)
{
    // send an email

    // anything else ... update another entity ... (call AnotherService::save() too) 
}

这对于简单的保存过程来说非常棒。

但现在,我想大量导入大量成员,包括创建、更新……为了实现这一点,我阅读了与批量导入相关的 Doctrine 文档。 Doc here

但是如何正确更新我的代码以处理 "bulk saving" 和 "single saving"?并保持交易安全和事件?

基本上我建议你实现 Doctrine\Common\Collections\Collection 接口,也许扩展 ArrayCollection,并创建一个方法保存,它将执行文档告诉你的操作。

<?php

class MyDirtyCollection extends \Doctrine\Common\Collections\ArrayCollection {

    public function __construct(AbstractService $abstractService)
    {
        $this->service = $abstractService;
    }

    public function save()
    {
        foreach ($this as $entity) {
            $this->service->save($entity);
        }
    }
}

class MyCollection extends \Doctrine\Common\Collections\ArrayCollection {

    public $bulkSize = 500;

    protected $eventManager;
    protected $entityManager;

    public function __construct(EntityManager $entityManager, EventManager $eventManager)
    {
        $this->entityManager = $entityManager;
        $this->eventManager = $eventManager;
    }

    public function getEventManager()
    {
        return $this->eventManager;
    }

    public function getEntityManager()
    {
        return $this->entityManager;
    }

    public function setBulkSize(int $bulkSize)
    {
        $this->bulkSize = $bulkSize;
    }

    public function save()
    {
        $transactionStarted = $this->getEntityManager()->getConnection()->beginTransaction();

        try {
            foreach ($this as $entity) {
                $action = $entity->getId() ? self::UPDATE : self::CREATION;
                $this->getEventManager()->trigger('save.pre', $entity, ['action' => $action]);
            }

            $i = 0;
            foreach ($this as $entity) {
                $i++;

                $this->getEntityManager()->persist($entity);

                if (($i % $this->bulkSize) === 0) {
                    $this->getEntityManager()->flush();
                    $this->getEntityManager()->clear();
                }
            }

            $this->getEntityManager()->flush();
            $this->getEntityManager()->clear();

            foreach ($this as $entity) {
                $action = $entity->getId() ? self::UPDATE : self::CREATION;
                $this->getEventManager()->trigger('save.post', $entity, ['action' => $action]);
            }

            if ($transactionStarted) {
                $this->getEntityManager()->getConnection()->commitTransaction();
            }

        } catch (Exception $e) {
            $this->getEntityManager()->rollbackTransaction();
        }
    }
}

类似的东西 ;) 当你获取你的数据时,你会滋润你的集合,然后你处理你的实体,最后调用 $collection->save();

编辑:添加插入 class 和下面的用例:

这里的性能会很低,但仍然比逐个提交要好。然而,如果您正在寻找 hgih 性能,您应该考虑使用 Doctrine DBAL 而不是 ORM。在这里,我与您分享我的 DBAL class for bulk Insert :

<?php

namespace JTH\Doctrine\DBAL;

use Doctrine\DBAL\Query\QueryBuilder;
use Exception;
use InvalidArgumentException;
use Traversable;
use UnderflowException;

class Insert extends QueryBuilder
{
    const CALLBACK_FAILURE_SKIP = 0;
    const CALLBACK_FAILURE_BREAK = 1;

    protected $callbackFailureStrategy = self::CALLBACK_FAILURE_BREAK;

    public static $defaultBulkSize = 500;

    public $ignore = false;
    public $onDuplicate = null;

    public function values(array $values)
    {
        $this->resetQueryPart('values');
        $this->addValues($values);
    }

    public function addValues(array $values)
    {
        $this->add('values', $values, true);
    }

    public function setCallbackFailureStrategy($strategy)
    {
        if ($strategy == static::CALLBACK_FAILURE_BREAK) {
            $this->callbackFailureStrategy = static::CALLBACK_FAILURE_BREAK;
        } elseif ($strategy == static::CALLBACK_FAILURE_SKIP) {
            $this->callbackFailureStrategy = static::CALLBACK_FAILURE_SKIP;
        } else {
            $class = self::class;
            throw new InvalidArgumentException(
                "Invalid failure behaviour. See $class::CALLBACK_FAILURE_SKIP and $class::CALLBACK_FAILURE_BREAK"
            );
        }
    }

    public function getCallbackFailureStrategy()
    {
        return $this->callbackFailureStrategy;
    }

    public function execute()
    {
        return $this->getConnection()->executeUpdate(
            $this->getSQLForInsert(),
            $this->getParameters(),
            $this->getParameterTypes()
        );
    }

    /**
     * Converts this instance into an INSERT string in SQL.
     * @return string
     * @throws \Exception
     */
    private function getSQLForInsert()
    {
        $count = sizeof($this->getQueryPart('values'));

        if ($count == 0) {
            throw new UnderflowException("No values ready for INSERT");
        }

        $values = current($this->getQueryPart('values'));
        $ignore = $this->ignore ? 'IGNORE' : '' ;
        $sql = "INSERT $ignore INTO " . $this->getQueryPart('from')['table'] .
            ' (' . implode(', ', array_keys($values)) . ')' . ' VALUES ';

        foreach ($this->getQueryPart('values') as $values) {
            $sql .= '(' ;

            foreach ($values as $value) {
                if (is_array($value)) {
                    if ($value['raw']) {
                        $sql .= $value['value'] . ',';
                    } else {
                        $sql .= $this->expr()->literal($value['value'], $value['type']) . ',';
                    }
                } else {
                    $sql .= $this->expr()->literal($value) . ',';
                }
            }

            $sql = substr($sql, 0, -1);
            $sql .= '),';
        }

        $sql = substr($sql, 0, -1);

        if (!is_null($this->onDuplicate)) {
            $sql .= ' ON DUPLICATE KEY UPDATE ' . $this->onDuplicate . ' ';
        }

        return $sql;
    }

    /**
     * @param $loopable array | Traversable An array or object to loop over
     * @param $callable Callable A callable that will be called before actually insert the row.
     * two parameters will be passed :
     * - the key of the current row
     * - the row values (Array)
     * An array of rows to insert must be returned
     * @param $bulkSize int How many rows will be inserted at once
     * @param bool $transactionnal
     * @throws \Doctrine\DBAL\ConnectionException
     * @throws \Exception
     */
    public function bulk($loopable, callable $callable, $bulkSize = null, $transactionnal = true)
    {
        if (!is_array($loopable) and !($loopable instanceof Traversable)) {
            throw new InvalidArgumentException("$loppable must be either an array or a traversable object");
        }

        $bulkSize = $bulkSize ?? static::$defaultBulkSize;

        $this->getConnection()->getConfiguration()->setSQLLogger(null); // Avoid MonoLog memory overload

        if ($transactionnal) {
            $this->getConnection()->beginTransaction();
        }

        $this->resetQueryPart('values');

        foreach ($loopable as $key => $values) {
            try {
                $callbackedValues = $callable($key, $values);

                if (sizeof($callbackedValues) > 0) {
                    foreach ($callbackedValues as $callbackedValuesRow) {
                        $this->addValues($callbackedValuesRow);
                    }
                }
            } catch (Exception $e) {
                /*
                 * If a callback exception must break the transaction, then throw the exception to the call stack
                 * Else, skip the row insertion
                 */
                if ($this->callbackFailureStrategy == static::CALLBACK_FAILURE_BREAK) {
                    throw $e;
                } else {
                    continue;
                }
            }

            $count = count($this->getQueryPart('values'));

            if ($count >= $bulkSize) {
                $this->execute();
                $this->resetQueryPart('values');
            }
        }

        $count = count($this->getQueryPart('values'));

        if ($count > 0) {
            $this->execute();
        }

        $this->resetQueryPart('values');

        if ($transactionnal) {
            $this->getConnection()->commit();
        }
    }

    /**
     * @return boolean
     */
    public function isIgnore()
    {
        return $this->ignore;
    }

    /**
     * @param boolean $ignore
     */
    public function setIgnore(bool $ignore)
    {
        $this->ignore = $ignore;
    }

    /**
     * @return null|string
     */
    public function getOnDuplicate() : string
    {
        return $this->onDuplicate;
    }

    /**
     * @param null $onDuplicate
     */
    public function setOnDuplicate($onDuplicate)
    {
        $this->onDuplicate = $onDuplicate;
        $this->ignore = false;
    }


}

用例:

    try {
        $i = new Insert($this->getDoctrine()->getConnection('myDB'));
        $i->insert('myTable');
        $i->setOnDuplicate('col1 = VALUES(col1), updated_last = NOW()');
        $i->setCallbackFailureStrategy(Insert::CALLBACK_FAILURE_BREAK);
        $i->bulk($myArrayOfRows, function ($key, $row) {

            // Some pre-insert processing

            $rowset[] = $row;

            return $rowset;

        }, 500, true);

        $this->addFlash('success', 'Yay !');

    } catch (DBALException $e) {
        $this->addFlash('error', 'Damn, error : ' . $e->getMessage());
    }

最后,我使用了merge教义方法,似乎效果很好。

我写了一个单独的 AbstractService::saveBulk() 方法来保存大量 Member 实体,例如:

    /**
     * @param ArrayCollection $entities
     *
     * @return bool
     * @throws Exception
     */
    public function saveBulk(ArrayCollection $entities)
    {
        $batchSize = 100;
        $i         = 0;

        foreach ($entities as $entity)
        {
            $transactionStarted = $this->beginTransaction();

            try
            {
                $action = $entity->getId() ? self::UPDATE : self::CREATION;

                $this->getEventManager()->trigger('save.pre', $entity, ['action' => $action]);

                $entity = $this->getEntityManager()->merge($entity);

                $this->getEntityManager()->persist($entity);
                $this->getEntityManager()->flush();

                $this->getEventManager()->trigger('save.post', $entity, ['action' => $action]);

                if (($i % $batchSize) === 0)
                {
                    $this->getEntityManager()->clear();
                }

                if ($transactionStarted)
                {
                    $this->commitTransaction();
                }
            } catch (\Exception $e)
            {
                if ($transactionStarted)
                {
                    $this->rollbackTransaction();
                }

                throw new Exception(Exception::UNEXPECTED_ERROR, 'Unable to save entity', $e);
            }
        }

        $this->getEntityManager()->clear();

        return true;
    }

与 doctrine2 文档相反,我只为每个批次调用 clear() 而不是 flush() + clear(),因为对于调用的某些事件,我需要知道实体是否具有数据库标识符。

@JesusTheHun 感谢您的评论对我有很大帮助。