From SQL jams to pipeline style php

Posted by 每特17劃 on 2017-07-28

From SQL jams to pipeline style php

從 SQL 醬缸轉換為 pipeline 式的 PHP

在處理大量複雜訂單的操作時,常常混雜各式各樣的條件跟運作,例如:

  • A. 找出金額大於某個數字,並且下單時間在某個優惠時段的訂單,予以 N % 的紅利點數
  • B. 找出下單時間在某個優惠時段的訂單,予以 N % 的折扣
  • C. 找出金額大於某個數字,並且排除某個優惠時段的訂單,將訂單總金額某上某個比率,計入該消費者的總積分

依一般最常見的 PHP + MySQL 的作法,若有 100 條抓資料的操作,大概就會寫成 100 份(或更多) SQL 語法指令,並依對應的 MySQL 條件式組合。於是便會有一堆 WHERE / AND / OR / GROUP BY / HAVING / subquery / … 等交錯的組合結構。

在此,我先將這稱之為 SQL-driven 的開發方式。

因為 SQL 的語法指令本身不易模組化。所以兩個類似的操作,常常就是從某一個 SQL 指令複製貼上到另一個,再小改一部份。 這種開發程式的方式,在初期很快很方便,能迅速增加功能跟解決眼前的小任務。但隨操作的種類增加、需求多元變化,複雜度不斷往上疊加後,漸漸會遇到下述困境:

  1. 其中一處抓資料的 SQL + PHP 指令組合有修改。從該處複製衍生出去的指令程式,也都要跟著修改。
  2. 但修改前後可能是不同工程師,後手不見得知道有那些地方要跟著更新,也不知那些地方不能改。
  3. 有改動到的地方,該功能很可能需要重新測試跟驗證。改越多地方,潛在的後座力越大。因此,多數人傾向只完成眼前任務所需的最少更改,而不會再進一步碰觸其他需要一併更新的 SQL + PHP 指令。
  4. 幾番迭代後,這堆 SQL + PHP 程式,很快就落入語法前後不一致,彼此相關,但又邏輯無法貫通的窘境。
  5. 中後期,程式碼維護越來越痛苦,開發過程中處處有地雷。
  6. 最終,在這程式碼中工作,就好像在玩疊疊樂(Jenga) ,每個人都在往上疊加複雜度跟風險。不時擔心會不會改了某段程式,就不小心踩到地雷而程式大爆炸。

會走入這種困境,其根本的原因之一在於,這種 SQL-driven 的開發方式 ,每個算法跟函式功能之間,不易重組利用。並非開發人員對 PHP 、 MYSQL 程式語言能力不夠,而是這種程式架構的發展方式本身有其侷限。加上很多專案開發只著重在不斷增加新功能,而幾乎沒有事後 code review 跟 refactor 的過程,很常會遇到這樣的困境。

那麼,除了上述 SQL-driven 外,是否還有其他的開發方式呢?

Pipeline style

在此,想介紹的另一種風格思維: Pipeline style。在過往的經歷中,曾受過啟發的地方如下:

  • Unix/Linux 上 Shell Script 普遍用到的 pipe ,例如:
    • cat /proc/cpuinfo | grep vendor_id | cut -d: -f2 | uniq
      • 由 4 個獨立指令組成,中間透過 | 運算元的串接,前一指令的輸出會變成到下一指令的輸入
    • 一個複雜的大任務,可分拆成小任務的組合,最終能用化約成個別獨立的指令的組合。
    • 指令能組合並產生各式各樣的任務。
  • Hadoop 所提出的 MapReduce 的新作法。
  • MongoDB 的 Map-Reduce 方法
  • OpenVanilla 的 Method chaining:

在上述的啟發下,也跟著尋思在 PHP 上是否也能實踐這樣的架構。之後研究了一下,發現目前 PHP 裡有下述機制可以使用:

  • MapReduce 相關的 functions
    • array_map()
    • array_reduce()
    • array_filter()
  • Anonymous functions ( v5.4 之後支援 )
    • Anonymous functions 其實在 v5.3 時就已出現,但不完整
    • v5.4 之後,始支援 object and class scope binding 。(註: Closure::bind ) 這之後才有辦法作到比較好的語法包裝。

於是便動手嘗試實作如下:

初步實作

假設有訂單的數據資料如下:

/*
 * Order states
 */
define("_NEW", 1);
define("_PENDING", 2);
define("_PROCESSING", 3);
define("_DONE", 4);

/*
 * Sample data
 */
$orders = array(
    ["order_id" => 1, "amount" => 1000, "bonus" => null, "date" => "2017-07-20T18:17:03+08:00", "state" => _PENDING],
    ["order_id" => 2, "amount" => 3000, "bonus" => null, "date" => "2017-07-21T21:38:24+08:00", "state" => _NEW],
    ["order_id" => 3, "amount" => 2500, "bonus" => null, "date" => "2017-07-22T09:41:51+08:00", "state" => _DONE],
    ["order_id" => 4, "amount" => 1800, "bonus" => null, "date" => "2017-07-23T13:24:31+08:00", "state" => _PENDING],
    ["order_id" => 5, "amount" => 1200, "bonus" => null, "date" => "2017-07-24T07:55:47+08:00", "state" => _DONE],
);

// Debug output
print_r($orders);

array_filter() 的特性,我們可以用來實作過濾函式如下:

依狀態(state)來過濾選取訂單

$filter_state = function ($state) {
    $res_func = function ($item) use ($state) {
        return $item["state"] == $state;
    };

    return $res_func;
};

// Apply the filter funtion on the order data
$result = array_filter($orders, $filter_state(_PENDING));

// Debug output
print_r($result);

依日期(date)來過濾選取訂單

/*
 * A filter function to filter by date
 */
$filter_date = function ($from, $to) {
    $res_func = function ($item) use ($from, $to) {
        return (strtotime($from) <= strtotime($item["date"]) && strtotime($item["date"]) <= strtotime($to));
    };

    return $res_func;
};

// Apply the filter funtion on the order data
$result = array_filter($orders, $filter_date("2017-07-21", "2017-07-23"));

// Debug output
print_r($result);

array_reduce() 的特性,我們可以用來實作計算函式如下:

計算資料集內某欄位的加總

/*
 * A function to calculate the sum of specified column
 */
$calculate_sum = function ($col) {
    $res_func = function ($carry, $item) use ($col) {
        $carry += $item[$col];
        return $carry;
    };
    return $res_func;
};

// Apply the calculate funtion on the order data
$result = array_reduce($orders, $calculate_sum("amount"));

// Debug output
print_r($result);

array_map() 的特性,我們可以用來實作附加函式如下:

計算每一條的 bonus 的點數

/*
 * A function to calculate the bonus point of the order
 */
$map_bonus =  function ($percent) {
    $res_func = function ($item) use ($percent) {
        $item["bonus"] += $item["amount"] * $percent / 100.0;
        return $item;
    };

    return $res_func;
};

// Apply the calculate funtion on the order data
$result = array_map($map_bonus(0.18), $orders);

// Debug output
print_r($result);

有了基本的操作組合之後,可依上述擬定的函式,可以將之組合成一連串的操作如下:

// filter by the date
$result1 = array_filter($orders, $filter_date("2017-07-21", "2017-07-23"));
// filter by the state
$result2 = array_filter($result1, $filter_state(_DONE));
// calculate the bonus points
$result3 = array_map($map_bonus(0.5), $result2);
// calculate the sum of bonus points
$result4 = array_reduce($result3, $calculate_sum("bonus"));

// Debug output
print_r(
    [
        "result1" => $result1,
        "result2" => $result2,
        "result3" => $result3,
        "result4" => $result4
    ]
);

略見雛型,但看起來沒什麼吸引力。

進一步改善: PipelineArray

首先,這樣的寫法少了 Method chaining 的特性,還不夠好用。 於是,我進一步設計了一個 PipelineArray 的 Class 如下:

class PipelineArray
{
    public function __construct($rows)
    {
        $this->rows = $rows;
    }

    public function from_rows($rows)
    {
        $this->rows = $rows;
    }

    public function map($func)
    {
        $this->rows = array_map($func, $this->rows);
        return $this;
    }

    public function filter($func)
    {
        $this->rows = array_filter($this->rows, $func);
        return $this;
    }

    public function reduce($func)
    {
        $this->rows = array_reduce($this->rows, $func);
        return $this;
    }

    public function to_rows()
    {
        return $this->rows;
    }

    public function debug($msg)
    {
        print_r(
            [
                "msg" => "[DEBUG]: $msg",
                "rows" => $this->rows
            ]
        );
        return $this;
    }
}

以上述的 PipelineArray 包裝之後,程式的寫法可改善如下:

$result = (new PipelineArray($orders))
    ->filter($filter_date("2017-07-21", "2017-07-23"))
    ->filter($filter_state(_DONE))
    ->debug("step 1")
    ->map($map_bonus(0.5))
    ->debug("step 2")
    ->reduce($calculate_sum("bonus"))
    ->to_rows(); // Output in original row format

// Debug output
print_r(
    [
        "result" => $result,
    ]
);

改寫後,寫法看起來比較簡單明白一些。 而有了 Method chaining 的元素之後,語法上也較容易重新組合功能操作函式。

進一步封裝: Order class

有了上述的基礎,我們可能再進一步封裝成 Order 的 class ,讓整體的操作更簡潔一點。像是:

  • 繼承 PipelineArray class, 如此可重複使用先前的成果
  • NEW, PENDING, … 等狀態常數封裝進 Order
  • 將語法 ->filter($filter_state(_DONE)) 精簡為 ->filter_state(_DONE)

回到訂單的部份,新架構的程式可改寫如下:

class Order extends PipelineArray
{

    const NEW = 1;
    const PENDING = 2;
    const PROCESSING = 3;
    const DONE = 4;

    public function filter_state($state)
    {
        return $this->filter(function ($item) use ($state) {
            return $item["state"] == $state;
        });
    }

    public function filter_date($from, $to)
    {
        return $this->filter(function ($item) use ($from, $to) {
            return (strtotime($from) <= strtotime($item["date"]) && strtotime($item["date"]) <= strtotime($to));
        });
    }

    public function calculate_sum($col)
    {
        return $this->reduce(function ($carry, $item) use ($col) {
            $carry += $item[$col];
            return $carry;
        });
    }

    public function map_bonus($percent)
    {
        return $this->map(function ($item) use ($percent) {
            $item["bonus"] += $item["amount"] * $percent / 100.0;
            return $item;
        });
    }

    public function getDoneBonusSum()
    {
        return $this
            ->filter_state(self::DONE)
            ->calculate_sum("bonus");
    }

    public function sample_run()
    {
        return $this
            ->filter_date("2017-07-21", "2017-07-23")
            ->map_bonus(0.5)
            ->getDoneBonusSum();
    }
}

使用時語法如下:

print_r(
    [
        "result" => (new Order($orders))->sample_run()->to_rows()
    ]
);

注意到,上述中的 sample_run() 的函式中,重複使用了 getDoneBonusSum() 的功能。而 getDoneBonusSum()sample_run() 也都是由先前的 filter_state(), filter_date(), calculate_sum(), map_bonus(), … 所疊加組合而成,都是未來可持續重複使用跟重組的函式。 依照這個方向,可再持續實作更多的基礎操作函式,並產生更多相互疊加組合的功能。逐一涵蓋所有訂單相關的資料操作。

至此,大致利用了 PHP 現有的機制,完成了一個 Pipeline style 的初步架構設計。

若能掌握了這個方向,在遇到不斷發膨脹成長的 SQL + PHP 的程式困境時,可嘗試將底層的 SQL-driven 程式碼,逐步重構改為 Pipeline style 的實作,慢慢往可重複使用的架構收斂。

附註

最終 sample 程式碼如下:

<?php


class PipelineArray
{
    public function __construct($rows)
    {
        $this->rows = $rows;
    }

    public function from_rows($rows)
    {
        $this->rows = $rows;
    }

    public function map($func)
    {
        $this->rows = array_map($func, $this->rows);
        return $this;
    }

    public function filter($func)
    {
        $this->rows = array_filter($this->rows, $func);
        return $this;
    }

    public function reduce($func)
    {
        $this->rows = array_reduce($this->rows, $func);
        return $this;
    }

    public function to_rows()
    {
        return $this->rows;
    }

    public function debug($msg)
    {
        print_r(
            [
                "msg" => "[DEBUG]: $msg",
                "rows" => $this->rows
            ]
        );
        return $this;
    }
}

class Order extends PipelineArray
{

    const NEW = 1;
    const PENDING = 2;
    const PROCESSING = 3;
    const DONE = 4;

    public function filter_state($state)
    {
        return $this->filter(function ($item) use ($state) {
            return $item["state"] == $state;
        });
    }

    public function filter_date($from, $to)
    {
        return $this->filter(function ($item) use ($from, $to) {
            return (strtotime($from) <= strtotime($item["date"]) && strtotime($item["date"]) <= strtotime($to));
        });
    }

    public function calculate_sum($col)
    {
        return $this->reduce(function ($carry, $item) use ($col) {
            $carry += $item[$col];
            return $carry;
        });
    }

    public function map_bonus($percent)
    {
        return $this->map(function ($item) use ($percent) {
            $item["bonus"] += $item["amount"] * $percent / 100.0;
            return $item;
        });
    }

    public function getDoneBonusSum()
    {
        return $this
            ->filter_state(self::DONE)
            ->calculate_sum("bonus");
    }

    public function sample_run()
    {
        return $this
            ->filter_date("2017-07-21", "2017-07-23")
            ->map_bonus(0.5)
            ->getDoneBonusSum();
    }
}

/*
 * Sample data
 */
$orders = array(
    ["order_id" => 1, "amount" => 1000, "bonus" => null, "date" => "2017-07-20T18:17:03+08:00", "state" => Order::PENDING],
    ["order_id" => 2, "amount" => 3000, "bonus" => null, "date" => "2017-07-21T21:38:24+08:00", "state" => Order::NEW],
    ["order_id" => 3, "amount" => 2500, "bonus" => null, "date" => "2017-07-22T09:41:51+08:00", "state" => Order::DONE],
    ["order_id" => 4, "amount" => 1800, "bonus" => null, "date" => "2017-07-23T13:24:31+08:00", "state" => Order::PENDING],
    ["order_id" => 5, "amount" => 1200, "bonus" => null, "date" => "2017-07-24T07:55:47+08:00", "state" => Order::DONE],
);


/*
 * Test procedure
 */
print_r(
    [
        "result" => (new Order($orders))->sample_run()->to_rows()
    ]
);

?>