Использование Go и ClickHouse для ввода и применения крупномасштабных данных.

Здравствуйте. В этой статье мы рассмотрим применение метода ввода и использования крупномасштабных данных с помощью Go и ClickHouse. Во-первых, Go позволяет выполнять высокопроизводительную параллельную обработку, что делает его пригодным для управления массивными наборами данных. Аналогично, ClickHouse – это база данных, известная тем, что хранит большие объемы данных и обеспечивает их быстрое агрегирование, что делает ее удобной для анализа больших данных.

Использование Go и ClickHouse для ввода и применения крупномасштабных данных.

Сначала мы запустим ClickHouse с помощью Docker. Здесь порт ‘8123’ используется для HTTP-коммуникаций, а порт ‘9000’ позволяет напрямую взаимодействовать с сервером. В этой статье мы будем использовать только порт ‘9000’, но на всякий случай мы также откроем и развернем порт ‘8123’.

Во-первых, если вы просто развертывали сервер через Docker, то в последних версиях отсутствуют права пользователей, поэтому необходимо изменить файл ‘users.xml’, расположенный в папке ‘etc/clickhouse-server’.

<?xml version="1.0"?>
<clickhouse>
    <profiles>
       <default>
           <max_memory_usage>10000000000</max_memory_usage>
            <load_balancing>random</load_balancing>
        </default>       
    </profiles>
    <users>
       <default>
           <password></password>
        <networks>
                <ip>::/0</ip>
            </networks>
            <profile>default</profile>
            <quota>default</quota>
            <access_management>1</access_management> 
        </default>
    </users>
    <quotas>
        <default>
            <interval>
                <duration>3600</duration>
                <queries>0</queries>
                <errors>0</errors>
                <result_rows>0</result_rows>
                <read_rows>0</read_rows>
                <execution_time>0</execution_time>
            </interval>
        </default>
    </quotas>
</clickhouse>

В производстве идеальным вариантом будет создание XML напрямую, но пока, поскольку это только тест, необходимо вручную активировать 1, который вы видите выше. Основные настройки закомментированы

Теперь, когда у нас есть разрешение на использование запросов из учетной записи по умолчанию, давайте воспользуемся этим разрешением для создания суперпользователя root с помощью команд create user root identified by ‘1111’; и grant all on . to root with grant option;.

Использование Go и ClickHouse для ввода и применения крупномасштабных данных.
Использование Go и ClickHouse для ввода и применения крупномасштабных данных.

Войдя в систему под именем root, проверьте базы данных, и если logdb еще не существует, создайте его, как показано выше.

CREATE TABLE item_log
                (
                    log_id UUID DEFAULT generateUUIDv4(),
                    timestamp DateTime DEFAULT now(),
                    item_id UInt64,
                    user_id UInt64
                ) ENGINE = MergeTree()
                ORDER BY (timestamp, log_id);

Давайте просто воспользуемся приведенным выше SQL для создания таблицы, которая будет использоваться в данной статье.

Использование Go и ClickHouse для ввода и применения крупномасштабных данных.

Если база данных успешно создана, как показано на рисунке выше, то нам осталось выполнить еще один шаг подготовки в ClickHouse.

ALTER USER root SETTINGS async_insert = 1

Эта команда чрезвычайно важна. При параллельной вставке больших объемов данных, как в ClickHouse, нам необходимо использовать асинхронную вставку, чтобы минимизировать нагрузку и предотвратить потерю данных. Однако по умолчанию эта опция отключена. Поскольку мы не можем решить эту задачу для каждой таблицы, давайте управлять этой опцией на стороне пользователя.

func getConn() (driver.Conn, error) {
 dialCount := 0
 conn, err := clickhouse.Open(&clickhouse.Options{
  Addr: []string{"127.0.0.1:9000"},
  Auth: clickhouse.Auth{
   Database: "logdb",
   Username: "root",
   Password: "1111",
  },
  DialContext: func(ctx context.Context, addr string) (net.Conn, error) {
   dialCount++
   var d net.Dialer
   return d.DialContext(ctx, "tcp", addr)
  },
  Debug: true,
  Debugf: func(format string, v ...any) {
   fmt.Printf(format, v)
  },
  Settings: clickhouse.Settings{
   "max_execution_time": 60,
  },
  Compression: &clickhouse.Compression{
   Method: clickhouse.CompressionLZ4,
  },
  DialTimeout:          time.Second * 30,
  MaxOpenConns:         5,
  MaxIdleConns:         5,
  ConnMaxLifetime:      time.Duration(10) * time.Minute,
  ConnOpenStrategy:     clickhouse.ConnOpenInOrder,
  BlockBufferSize:      10,
  MaxCompressionBuffer: 10240,
  ClientInfo: clickhouse.ClientInfo{
   Products: []struct {
    Name    string
    Version string
   }{
    {Name: "k.d-app", Version: "0.0.1"},
   },
  },
 })

 if err != nil {
  return nil, err
 }
 err = conn.Ping(context.Background())
 if err != nil {
  return nil, err
 }
 return conn, err
}

Во-первых, здесь представлена простая функция подключения ClickHouse к Go. С помощью вышеупомянутой функции мы создали базовый, но всеобъемлющий механизм подключения. Используя его, мы проведем эксперимент по виртуальной вставке больших объемов данных.

clickhouseConn, err := getConn()
 if err != nil {
  log.Fatal(err)
 }
 for i := 0; i < 10; i++ {
  for ii := 0; ii < 10_000; ii++ {
   err := clickhouseConn.AsyncInsert(context.Background(), `INSERT INTO item_log(timestamp, item_id, user_id) values (now(), ?,?)`, false, rand.Uint64(), uint64(i))
   if err != nil {
    log.Fatal(err)
    return
   }
  }
 }

 fmt.Println("clickhouse batch success!!")

Дальнейший код прост. Стоит отметить, что в данной статье метод с использованием sql/database не применяется. Официально API clickhouse-go/v2 был проверен на практике и заявлен как более производительный. Кроме того, он в большей степени соответствует тому направлению, на которое мы ориентируемся в нашем проекте. Кроме того, как уже говорилось ранее, для обеспечения стабильного ввода данных мы будем использовать асинхронный метод – функцию asyncInsert.

Использование Go и ClickHouse для ввода и применения крупномасштабных данных.

При дальнейшем рассмотрении видно, что первоначально для тестирования было добавлено 10 записей данных, и кроме этого мы можем подтвердить, что 100 000 записей данных были успешно вставлены.

rows, err := clickhouseConn.Query(context.Background(), "SELECT * FROM item_log WHERE user_id = ?", 1)
 if err != nil {
  fmt.Println("errror", err)
 }
 cnt := 0
 for rows.Next() {
  il := &itemLog{}
  if err := rows.ScanStruct(il); err != nil {
   log.Fatal(err)
   return
  }
  /*In reality, it's big data processing.*/
  fmt.Println(il)
  cnt++
 }
 fmt.Println("total", cnt)
 defer rows.Close()
 log.Print(rows.Err())

Извлечение данных не представляет сложности. Однако, в отличие от широко используемых нами mysql и mariadb, clickhouse предоставляет данные в потоковом формате, что позволяет эффективно работать с большими объемами данных.

Теперь пришло время посмотреть на результаты. Поскольку мы запрашивали только столбцы со значением 1, ожидается увидеть 10001 запись.

Использование Go и ClickHouse для ввода и применения крупномасштабных данных.

Фантастика. Мы видим, что данные были получены быстро и без каких-либо пропусков. В нашем недавнем SaaS-проекте команда размышляла над тем, какую базу данных использовать для загрузки и агрегирования больших объемов данных, и мы открыли для себя мощную базу данных ClickHouse.

Она отлично сочетается с Go, так что всем стоит попробовать!!!

+1
0
+1
0
+1
0
+1
0
+1
0

Ответить

Ваш адрес email не будет опубликован. Обязательные поля помечены *