数据

前段时间做数据分析,有一段BTC的实时数据流缓存成的parquet文件

大小1019MB,数据有3690044条,有86列,除了时间列t是字符串,其他列都是float64类型:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
  	t	price	amount	buyprice_1	buyamount_1	sellprice_1	sellamount_1	buyprice_2	buyamount_2	sellprice_2	...	buyamount_19	sellprice_19	sellamount_19	buyprice_20	buyamount_20	sellprice_20	sellamount_20	wpr	wpr_log	ret
0	2022-10-12T14:15:47+08:00	NaN	NaN	19131.810547	0.33867	19132.880859	0.98643	19131.800781	0.09596	19133.130859	...	0.06449	19134.419922	0.06000	19131.000000	0.04138	19134.500000	0.00152	19132.387528	9.859138	9.859138e+00
1	2022-10-12T14:15:47+08:00	NaN	NaN	19131.619141	0.00836	19132.089844	0.10000	19131.380859	0.06917	19132.099609	...	0.02000	19134.179688	0.00131	19130.789062	0.10494	19134.199219	0.00100	19131.787850	9.859107	-3.134407e-05
2	2022-10-12T14:15:47+08:00	NaN	NaN	19131.619141	1.06900	19131.810547	0.10500	19131.390625	0.00041	19131.900391	...	0.00821	19134.160156	0.06212	19130.710938	0.06000	19134.169922	0.09210	19131.706830	9.859102	-4.234885e-06
3	2022-10-12T14:15:47+08:00	19132.490234	0.28606	19131.619141	0.17314	19131.630859	0.09775	19131.380859	0.00540	19131.900391	...	0.14987	19134.089844	0.00659	19130.609375	0.03131	19134.160156	0.06212	19131.623193	9.859098	-4.371655e-06
4	2022-10-12T14:15:47+08:00	19131.900391	0.02349	19131.890625	0.02948	19132.039062	0.11681	19131.349609	0.00168	19132.429688	...	0.03131	19134.150391	0.25188	19130.550781	0.00999	19134.160156	0.06212	19131.949671	9.859115	1.706470e-05
...	...	...	...	...	...	...	...	...	...	...	...	...	...	...	...	...	...	...	...	...	...
3690040	2022-10-16T23:59:59+08:00	NaN	NaN	19158.089844	0.26098	19158.400391	0.00837	19158.000000	0.00089	19158.410156	...	0.00055	19158.869141	0.00130	19156.720703	0.05300	19158.980469	0.10000	19158.283963	9.860490	3.494641e-07
3690041	2022-10-16T23:59:59+08:00	19158.730469	0.00060	19158.539062	0.04044	19158.730469	0.08000	19158.490234	0.26097	19158.779297	...	0.52199	19159.599609	0.08087	19156.900391	0.00055	19159.609375	0.31406	19158.647272	9.860509	1.896338e-05
3690042	2022-10-16T23:59:59+08:00	19158.730469	0.00244	19158.490234	0.26097	19158.650391	0.00129	19158.000000	0.00089	19158.730469	...	0.02000	19159.689453	0.00104	19156.900391	0.00055	19159.820312	0.03578	19158.573212	9.860506	-3.865605e-06
3690043	2022-10-16T23:59:59+08:00	19158.679688	0.00074	19158.509766	0.04044	19158.650391	0.00129	19158.490234	0.26097	19158.730469	...	0.06097	19159.820312	0.03578	19157.080078	0.00667	19159.919922	0.09157	19158.550647	9.860504	-1.177803e-06
3690044	2022-10-16T23:59:59+08:00	19158.490234	0.00113	19158.619141	0.04746	19158.679688	0.01500	19158.550781	0.00111	19158.810547	.

为什么会有这么多列?

实时数据最少也要包含两部分数据,订单簿(Orderbook)和成交记录(trade history),我这里收集的是买卖20档深度的订单簿(上下各20档),每一裆有两个字段,那就是 20 * 2 *2 = 80个字段了,加上其他的简单字段,86个字段已经不算多了。

而如果我想要收集100档深度的订单簿,那列数量会达到800+,文件的大小肯定也会暴涨。

Go

收集数据的目的当然是用来处理数据,显然对我来说,目前最便捷的方式就是用go来处理数据。

机器内存: 16G内存 + 16G swap

操作系统: archlinux

使用parquet-go库处理

parquet-go 这个库目前start是946,已经算是不错的库了,废话不多说,上代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main

import (
	"log"
	"os"
	"time"

	"github.com/xitongsys/parquet-go-source/local"
	"github.com/xitongsys/parquet-go/reader"
)

func main() {
	cacheFile := os.Args[1]
	fr, err := local.NewLocalFileReader(cacheFile)
	if err != nil {
		log.Println("Can't open file")
		return
	}

	pr, err := reader.NewParquetReader(fr, nil, 1)
	if err != nil {
		log.Println("Can't create parquet reader", err)
		return
	}

	num := int(pr.GetNumRows())
	res, err := pr.ReadByNumber(num)
	if err != nil {
		log.Println("Can't read", err)
		return
	}
	pr.ReadStop()
	fr.Close()
	log.Println("data:", len(res))
	time.Sleep(time.Minute)
}

可以看到这个代码非常的简单,就是读取所有数据到内存中,然后打印一下总条数。

然而不幸的是,上面的程序根本无法正常运行完,会发生OOM……

是数据量超过“单机极限”吗?

根据我以前做文件系统分析的经验来看,16GB内存,用C++单机程序可以处理/加载千万以上的数据,虽然这个数据样本的列数有点多,但是考虑到全都是浮点数,并不会占用太多内存,所以理论上单机16GB内存应该是足够的。

所以,显然,并不是超过了“单机极限”

可以优化吗?

不幸的是,按照我曾经类似场景的经验来讲,除非在程序运行期间一直触发go的GC,否则没什么可优化的…… 即使一直调用runtime.GC(),内存也不能得到充足的释放。

那么就设置 GOGC=1来重新运行下程序,并使用pmap来查看进程内存占用,最终内存占用23G左右,程序终于跑完了

1
total         23296864K

备注: 实际上,只有使用debug.FreeOSMemory(),才可以释放可观的内存给操作系统

换库

使用apache官方的arrow库来试试, arrow

上代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package main

import (
	"context"
	"fmt"
	"os"
	"time"

	"github.com/apache/arrow/go/v11/arrow/array"
	"github.com/apache/arrow/go/v11/arrow/memory"
	"github.com/apache/arrow/go/v11/parquet"
	"github.com/apache/arrow/go/v11/parquet/pqarrow"
)

func main() {
	alloc := memory.NewGoAllocator()
	ctx := context.Background()
	f, err := os.Open(os.Args[1])
	if err != nil {
		fmt.Println(err.Error())
		return
	}
	// defer f.Close()
	opt := parquet.NewReaderProperties(alloc)
	opt.BufferSize = 1024 * 1024 * 100
	var arrProps pqarrow.ArrowReadProperties
	arrProps.BatchSize = 100000
	arrProps.Parallel = true
	tbl, err := pqarrow.ReadTable(ctx, f, opt, arrProps, memory.DefaultAllocator)
	if err != nil {
		fmt.Println(err.Error())
		return
	}
	f.Close()
	fmt.Println(tbl.NumRows(), tbl.NumCols())
	col1 := tbl.Column(0)

	chunks := col1.Data().Chunks()
	data := chunks[0].(*array.String)
	fmt.Println(data.Value(0))
	fmt.Println(data.Value(10))
	time.Sleep(time.Minute)
}

直接使用GOGC=1来运行程序,可以看到内存占用有9GB左右

1
total          9137148K

好吧,无论如何,程序至少正常运行了

大招1—换rust

9GB的内存,处理369w的数据,这个只能算可用,离好用还差了十万八千里了,看来只能上大招了,换rust!

上代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
use polars::prelude::*;
use std::{thread, time::Duration, env};

fn main(){
    let args: Vec<String> = env::args().collect();
    let cache_file = args[1].as_str();
    let mut file = std::fs::File::open(cache_file).unwrap();
    let df = ParquetReader::new(&mut file).finish().unwrap();
    println!("{:?}",df.shape());
    let datas =df.get(1).unwrap();
    println!("{:?}",     datas[0]);
    println!("{:?}",     datas[10]);
    thread::sleep(Duration::from_secs(60));

}

内存占用3GB左右

1
total          3578196K

大招2—换c++

上代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include <iostream>
#include <thread>
#include <chrono>
#include <arrow/api.h>
#include <arrow/io/api.h>
#include <parquet/arrow/reader.h>
#include <parquet/arrow/writer.h>
#include <parquet/exception.h>

int main(int argc, char *argv[])
{
  std::string file = argv[1];
  std::cout << "Reading parquet-arrow-example.parquet at once" << std::endl;
  std::shared_ptr<arrow::io::ReadableFile> infile;
  infile = arrow::io::ReadableFile::Open(file, arrow::default_memory_pool()).MoveValueUnsafe();

  std::unique_ptr<parquet::arrow::FileReader> reader;
  PARQUET_THROW_NOT_OK(
      parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader));
  std::shared_ptr<arrow::Table> table;
  PARQUET_THROW_NOT_OK(reader->ReadTable(&table));
  std::cout << "Loaded " << table->num_rows() << " rows in " << table->num_columns()
            << " columns." << std::endl;
  std::this_thread::sleep_for(std::chrono::seconds(60)); // 1s

  return 0;
}

c++使用-o2进行编译优化

1
g++ -o2 main.cpp -o cpp -l arrow -lparquet

内存占用大概4GB

1
total          4401860K

思考

从上面的数据可以看到,Go代码即使将GC调到最小,占用内存依然是非常多的,比rust/c++的程序占用内存多2-3倍。

所谓性能,大多数时候就是在人力成本和硬件成本之间做的一个选择,在计算机硬件发展迅速的时候,大多数情况下都会偏向于选择低人工的选择; 而当摩尔定律逐渐失效,硬件成本逐渐增加的时候,也许很多场景又会倾向与选择更底的硬件成本了。

PS: 就我亲身体验的,就不止一家公司(非小公司)业务从python转向go,至于有没有可能、什么时候会再转向rust/c++,那就要看机器成本和人工成本的比重的变化了。

为什么不用大数据

有些人会说,都上百万/千万数据了,为什么不用大数据来出来?

诚然,大数据是处理大型数据集的很不错的选项,spark/flink我也还能玩的动。

问题在于:

  1. 大数据的数据和计算模型和常规程序是不一样的,在这个过程中就必须要重写处理逻辑,而我期望的是回测和实盘的处理逻辑尽量保持一致。
  2. 在硬件资源不够充足的场景下,例如只有3、5台机器,所谓的大数据并不能最充分的利用资源,jvm也是有GC的,表现的也并不会比Go程序好到哪里去。 这里有个例子,在机器资源有限的情况下(3/5台),在某些场景下,例如日志查询,数据简单分析,clickhouse是吊打elasticsearch的。
  3. 大数据技术栈相对闭环,且相对复杂,在团队较小情况下,ROI非常低。

是Go真的不行吗

我相信,只要有足够优秀的工程师+足够好的算法,在这个场景中go肯定也能得到非常大的优化,例如有名的fasthttp就能和rust打的有来有回。

但是这个问题的关键更在于,“人”。

最简单的例子:

  1. 现在写go的都是什么人? 以前写php,写python,写java/c#的 (并没有说这些人不好的意思,只是毕竟和底层语言侧重点不同),像我这种写c++转go的都是少数。
  2. 写rust的都是什么人? 一大堆对C/C++不满足的人。

这就注定了,rust群体中更容易出现在性能/底层上的创新,而go群体中更容易出现在网络/快速开发的创新。

所以从github的trend里,也能看到,经常会出现一些rust写的传统命令行工具替代品,Go也有,但是相对较少。

而“人”的聚集,也代表着“人才”的聚集,“人才”聚集多的地方,当然也更容易出现好的产品。

代码

以上代码可以在下面的仓库里找到:

代码