Группа WeChat для общения:
Личный WeChat (если группа больше недоступна, свяжитесь со мной, чтобы присоединиться):
Распределённая система управления задачами HERA — архитектура и базовые функции (часть 1)
Распределённая система управления задачами HERA — запуск проекта (часть 2)## Введение
С развитием бизнеса на платформе больших данных, ежедневно выполняется множество ETL-задач, управляемых через Hive и shell-скрипты. Как обеспечить правильное выполнение этих множественных ETL-задач, а также возможность автоматического восстановления при возникновении ошибок, отправки уведомлений об ошибках и полного логирования процесса? HERA
— это распределенная система управления задачами, созданная именно для решения таких проблем. С увеличением размера кластера HERA, он может эффективно обслуживать тысячи задач. Это нативная распределенная система управления задачами, которая позволяет быстро добавлять новые рабочие узлы (workers
) и масштабировать кластер. Она поддерживает выполнение скриптов Shell, Hive и Spark, а также может быть легко адаптирована для работы с другими серверными языками, такими как Python.
HERA — это распределенная система планирования задач, основанная на ранее открытой распределенной системе планирования задач (
Zeus
) компании Alibaba.Zeus
был открыт в 2014 году, но после этого больше не поддерживался. Наша компания (Erfu) внедрила систему планирования задачZeus
в 2015 году и использовала её до ноября текущего года. В нашем отделе и даже во всей компании она играет незаменимую роль.За время моей работы сzeus
я вынужден признать его мощность — если размер кластера и конфигурация подходят, он может эффективно выполнять планирование десятков тысяч, а то и сотен тысяч задач. Однако из-за того, что кодzeus
не поддерживается, а фронтенд использует технологию GWT, его сложно поддерживать. Я вместе со своим коллегой (псевдоним: Lingxiao, который сейчас работает в отделе Taobao Alibaba) начали переписыватьzeus
в марте этого года, переименовав его вHera
.*** Адрес проекта: git@github.com:scxwhite/hera.git ***
Система hera
отвечает только за планирование задач и помощь в этом процессе. Конкретные вычисления выполняются на кластерах hadoop
, hive
, yarn
, spark
. Поэтому существует жесткое требование, чтобы выполнение задач hadoop
, hive
, spark
и других было возможно только тогда, когда рабочие процессы (workers
) системы hera
установлены на некоторых машинах этих кластеров. Если это просто скрипт shell
, то ему необходима операционная система Linux
. Для системы Windows
можно использовать себя как master
для тестирования.
Hera
система строго следует архитектурному паттерну master-slave, где главный узел выполняет задачи планировщика и распределителя задач, а рабочие узлы — исполнители конкретных задач. Архитектурная схема представлена ниже:
Целями проектирования системы распределенного планировщика задач hera
являются выполнение основных функций системы zeus
, а также возможность её расширения в соответствии со специфическими требованиями компании. Основные цели включают следующее:- Поддержка периодического планирования задач, зависимого планирования, ручного запуска и восстановления после прерывания;
shell
, hive
, python
, spark-sql
, java
;DAG
) и выполнение задач согласно их зависимости;HDFS
и использования этих файловых ресурсов;HA
), позволяющая автоматически восстанавливать задачи при сбое одного узла, а также обеспечивать перехват роли главного узла рабочими узлами при его отключении;Hera
;map-reduce
и YARN
в режиме реального времени.- (И многое другое, ждите новых возможностей)Основано на cron-выражении для анализа времени выполнения задачи. При достижении указанного времени задача добавляется в очередь задач.
Для тестирования создана новая shell
задача, которая должна выполняться в 15:40. В конфигурационных параметрах указываются наши собственные настройки, а ${}
используется для замены значений внутри скрипта. Конфигурация группы, к которой принадлежит задача, также может использоваться, при этом повторяющиеся параметры будут заменяться последними значениями.
Когда задача достигает своего времени выполнения, она начинает работу и выводит желаемый результат: распределённая система управления задачами Hera
- Зависимое планирование
Большая часть наших задач имеет зависимости. Только после завершения вычислений предыдущей задачи можно приступить к следующему шагу. Зависимые задачи добавляются в очередь задач только после завершения всех зависящих от них задач.Пример существующего состояния выполнения задачи
Ручное планирование представляет собой задачу, которую можно запустить вручную. После успешного выполнения задача автоматически добавляется в очередь задач. Успешное выполнение ручной задачи не отправляет уведомление нижестоящим задачам (то есть, задачам, зависящим от данной задачи).
Ручное восстановление аналогично ручному планированию, но отличается тем, что если задача успешно выполнена, то уведомление отправляется нижестоящим задачам о том, что данная задача завершена.
Распределённая система управления задачами Hera
использует нативный ProcessBuilder
из JDK
, чтобы выполнять задачи через командную строку (shell
) на серверах worker/master
. Например, python
задача
Можно сначала написать python
скрипт hello.py
(здесь просто вставлено изображение), а затем загрузить этот скрипт в hdfs
При выполнении можно использовать следующий пример:
download[hdfs:///hera/hello.py hello.py];
python hello.py;
чтобы запустить этот скрипт.
Таким образом, полный python
скрипт может быть выполнен через вызов командной строки, используя внутренние механизмы выполнения задач hera
, парсер синтаксиса скрипта и выполнение python
задачи. В действительности, таким образом можно даже выполнять скрипты на других серверных языках, таких как java
, scala
, hive-udf
.
Для выполнения скриптов Hive и Spark-SQL используется команда
-f
для выполнения файла.
Выполнение задач строго следует за их зависимыми отношениями, что позволяет просматривать состояние выполнения верхних и нижних уровней задач. Через граф зависимости можно легко понять причину того, почему конкретная задача ещё не была выполнена, а также последствия её удаления.
Когда количество задач большое и зависимости между ними сложны, требуется граф DAG для просмотра отношений между задачами.
Жёлтый цвет указывает на выполняющуюся задачу, серый — на закрытую задачу, красный — на неудачно завершившуюся задачу, зелёный — на успешно завершившуюся задачу. Справа отображается подробная информация о задаче.
Конечно, здесь поддерживаются как просмотр всех задач, так и просмотр выбранных задач. При клике на задачу будут показаны все задачи, зависящие от неё.
Это уже было упомянуто выше в разделе Поддержка различных типов задач: shell, hive, python, spark-sql, java, поэтому повторяться не будем.
При выполнении задачи можно просматривать логи для получения информации о текущих событиях.### Поддержка автоматического восстановления после сбоев задач
Конечно, в некоторых случаях задачи могут потерпеть неудачу, например, из-за временного отключения сети — возможно, через секунду всё будет в порядке. Для очень важных задач рекомендуется включить возможность повторной попытки при неудаче выполнения задачи. Установите количество попыток повтора и интервал времени между ними. После этого master
будет выполнять повторную попытку неудачных задач согласно вашим настройкам.Например, установите количество попыток повтора 3, а интервал времени между попытками — 10 минут.
При неудачной попытке выполнения задачи происходит три повторных попытки. Временной промежуток между началом и концом последней неудачной попытки составляет 10 минут.
Создание распределённой системы невозможно без реализации кластерной защиты от аварийных ситуаций. При возникновении отказа сервера или внезапного прекращения связи требуется обеспечить автоматическую восстановление соединения кластера. На этом этапе важно также обеспечить восстановление задач после восстановления соединения.
Это сложно представить здесь, ждите возможности испытать это позже.### Поддержка визуального просмотра информации о нагрузке master/work, используемой памяти, процессах и данных CPU
Здесь можно просмотреть процент использования CPU пользователями, системой, свободное время CPU и другую информацию…
Сначала посмотрим на картинку
Ха-ха, немного примитивно, но предоставляются API-интерфейсы. Нужна помощь с фронтендом, так как сейчас работаю один. Жду помощи.
Информация включает использование памяти машинами, среднюю нагрузку на каждое ядро, общее количество памяти машин, время сбора статистики. Есть три очереди: running
, manualRunning
, debugRunning
. На машинах с меткой master
они представляют собой задачи, ожидающие выполнения, а на машинах с меткой work
— задачи, которые выполняются. Они соответственно относятся к автоматическому планированию зависимых задач, задачам, выполняемым вручную, и задачам разработки (в другой разработочной среде).### Поддержка визуального просмотра информации о текущих задачах, неудачных задачах, успешных задачах и задачах с наибольшим временем выполнения. Также поддерживаются линейные графики истории выполнения задач, конкретно до количества выполненных задач, неудачных задач, успешных задач, общего количества задач, общего количества неудачных задач и общего количества успешных задач за день.
Круговая диаграмма позволяет просматривать подробную информацию о конкретных задачах при нажатии.
При необходимости отслеживания некоторых задач можно использовать функцию отслеживания задач для получения информации о неудачном выполнении.
Хотя мы видим неудачные задачи на главной странице, иногда нам не хочется следить за чужими задачами. В этом случае можно создать свою группу и добавить задачи в эту группу. Это позволит вам через обзор задач группы проверять состояние задач.
Например, если вы выбрали "Обзор задач",Вы можете просмотреть все данные о задаче, будь то выполнение, невыполнение, неудачное выполнение, успешное выполнение или выполнение в процессе. Если задача ещё не выполнена, будет показана информация о том, что некоторые зависимые задачи также не выполнены. Если зависимые задачи выполнены, будут показаны время выполнения и другая информация. Также справа есть возможность фильтровать данные, например, просмотреть неудачные или выполняемые задачи.
На самом деле, мы реализовали множество других функций, таких как удаление задач с учётом наличия зависимых задач, закрытие задач с учётом наличия зависимых задач и т.д. Кроме того, мы внедрили анализ зависимости Hive SQL
, анализ полей, анализ логов выполнения задач и т.д., которые были реализованы в других проектах и могут быть внедрены в Hera
.
Рассмотрим вопрос о публикации проекта в качестве open source в декабре. Сейчас активно проводится работа по его организации. Приведён ниже адрес блога Lingxiao: https://blog.csdn.net/pengjx2014/article/details/81276874 Все вышеуказанная информация является тестовой офлайн информации
***Адрес проекта: git@github.com:scxwhite/hera.git***
После того как вы клонируете hera
с помощью git
, найдите файл hera.sql
в директории hera/hera-admin/resources
. В своей базе данных создайте необходимые таблицы и внесите начальные данные.Теперь вы можете найти файл application.yml
в директории hera/hera-admin/resources
. Измените конфигурацию источника данных hera
(измените конфигурацию в разделе druid.datasource
) для продолжения дальнейших действий.```yml
spring:
profiles:
active: @env@ # текущее окружение, которое указывается при сборке через -P
http:
multipart:
max-file-size: 100Mb # максимальный размер файла для загрузки
max-request-size: 100Mb # максимальный размер запроса для загрузки
freemarker:
allow-request-override: true
cache: false
check-template-location: true
charset: utf-8
content-type: text/html
expose-request-attributes: false
expose-session-attributes: false
expose-spring-macro-helpers: false
suffix: .ftl
template-loader-path: classpath:/templates/
request-context-attribute: request
druid:
datasource:
username: root # имя пользователя базы данных
password: XIAOSUDA # пароль базы данных
driver-class-name: com.mysql.jdbc.Driver # драйвер базы данных
url: jdbc:mysql://localhost:3306/hera?characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&autoReconnect=true&allowMultiQueries=true
initial-size: 5 # начальное количество соединений в пуле
min-idle: 1 # минимальное количество живых соединений
max-active: 16 # максимальное количество соединений в пуле
max-wait: 5000 # максимальное время ожидания получения соединения, мс. После того как указано maxWait, по умолчанию используется синхронизация с использованием fair lock, что может привести к снижению производительности при высокой конкуренции. Это можно исправить, установив useUnfairLock в значение true.
time-between-connect-error-millis: 60000 # интервал времени между проверками соединений, мс. Уничтожает пустые соединения, если они остаются незадействованными более minEvictableIdleTimeMillis.
``` min-evictable-idle-time-millis: 300000 # Минимальное время бездействия соединения перед его удалением, мс.
test-while-idle: true # При запросе соединения проверяет, действителен ли он, если время его бездействия превышает timeBetweenEvictionRunsMillis.
test-on-borrow: true # При запросе соединения проверяет, действителен ли он.
test-on-return: false # При возврате соединения проверяет, действителен ли он.
connection-init-sqls: set names utf8mb4
validation-query: select 1 # SQL-запрос для проверки действительности соединения.
validation-query-timeout: 1 # Время ожидания выполнения запроса для проверки действительности соединения, секунды.
log-abandoned: true
stat-mergeSql: true
filters: stat,wall,log4j
connection-properties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
сервер:
порт: 8080
контекст-путь: /heraочистка:
путь: ${server.context-path}
#Глобальная конфигурация Hera
Hera:
defaultWorkerGroup: 1 #По умолчанию worker хост группы ID
preemptionMasterGroup: 1 #Группа ID хоста для захвата мастеров
excludeFile: jar;war
maxMemRate: 0.80 #Максимальная часть используемой памяти от общего объема памяти, по умолчанию 0.80. Когда использование памяти рабочими достигает этого значения, новые задачи больше не будут отправлены этим рабочим.
maxCpuLoadPerCore: 1.0 #CPU нагрузка на ядро равна средней нагрузке системы за последние минуту ÷ количеству ядер процессора, по умолчанию 1.0. Когда средняя нагрузка использования CPU рабочими достигает этого значения, новые задачи больше не будут отправлены этим рабочим.
scanRate: 1000 #Частота сканирования очереди задач (миллисекунды)
systemMemUsed: 4000 #Система занимает память
perTaskUseMem: 500 #Предположительно каждая задача использует 500 МБ памяти
requestTimeout: 10000 #Время ожидания асинхронных запросов
channelTimeout: 1000 #Время ожидания запросов Netty heartBeat : 3 # частота передачи данных сердечника
downloadDir : /opt/logs/spring-boot
hdfsLibPath : /hera/hdfs-upload-dir # здесь обязательно должен быть HDFS путь, все загружаемые приложения будут храниться в этом пути
schedule-group : online
maxParallelNum: 2000 # максимальное количество параллельных задач, которое разрешено мастером. Когда это число превышено, задачи будут помещены в очередь ожидания.
connectPort : 9887 # порт связи Netty
admin: biadmin # имя пользователя администратора
taskTimeout: 12 # максимальное время выполнения одной задачи (единицы измерения: часы)
env: @env@# Конфигурация отправителя email
почта:
сервер: smtp.mxhichina.com
протокол: smtp
порт: 465
пользователь: xxx
пароль: xxx
логирование:
конфиг: classpath:logback-spring.xml
путь: /opt/logs/spring-boot
уровень:
корень: INFO
org.springframework: ERROR
com.dfire.common.mapper: ERROR
mybatis:
конфигурация:
mapUnderscoreToCamelCase: true
# Конфигурация Spark
spark :
адрес: jdbc:hive2://localhost:10000
драйвер: org.apache.hive.jdbc.HiveDriver
имя_пользователя: root
пароль: root
мастер: --master yarn
память_драйвера: --driver-memory 1g
ядра_драйвера: --driver-cores 1
память_выполнителя: --executor-memory 1g
ядра_выполнителя: --executor-cores 1
---
## Разработка окружение
spring:
профили: dev
логирование:
уровень:
com.dfire.logs.ScheduleLog: ERROR
com.dfire.logs.HeartLog: ERROR
---
## Производственное окружение обычно совпадает с разработочной средой
spring:
профили: daily
---
## Предпроизводная среда
spring:
profiles: pre
druid:
datasource:
url: jdbc:mysql://localhost:3306/lineage?characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&autoReconnect=true&allowMultiQueries=true
username: root
password: root
# Настройка Spark
spark :
адрес: jdbc:hive2://localhost:10000 # Адрес Spark
мастер: --master yarn
память_драйвера: --driver-memory 2g
ядра_драйвера: --driver-cores 1
память_выполнителя: --executor-memory 2g
ядра_выполнителя: --executor-cores 1
---
## Операционная среда
spring:
profiles: publish
druid:
datasource:
url: jdbc:mysql://localhost:3306/lineage?characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&autoReconnect=true&allowMultiQueries=true
username: root
password: root
# Настройка Spark
spark :
адрес: jdbc:hive2://localhost:10000
мастер: --master yarn
память_драйвера: --driver-memory 2g
ядра_драйвера: --driver-cores 1
память_выполнителя: --executor-memory 2g
ядра_выполнителя: --executor-cores 1```
## Упаковка и развертывание
После выполнения вышеописанных действий можно использовать команду сборки Maven:
mvn clean package -Dmaven.test.skip -Pdev
После упаковки можно перейти в директорию `hera-admin/target` и проверить наличие файла `hera.jar`. Для запуска проекта используйте следующую команду:
java -server -Xms4G -Xmx4G -Xmn2G -jar hera.jar
Проект будет доступен по адресу:
Для входа в систему используйте учётную запись `hera` с паролем `biadmin`.
Кроме того, прилагаю свой скрипт запуска:
```sh
#!/bin/sh
JAVA_OPTS="-server -Xms4G -Xmx4G -Xmn2G -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+CMSParallelRemarkEnabled -XX:CMSFullGCsBeforeCompaction=5 -XX:+CMSParallelInitialMarkEnabled -XX:CMSInitiatingOccupancyFraction=80 -verbose:gc -XX:+PrintGCTimeStamps -XX:+PrintGCDetails -Xloggc:/opt/logs/spring-boot/gc.log -XX:MetaspaceSize=128m -XX:+UseCMSCompactAtFullCollection -XX:MaxMetaspaceSize=128m -XX:+CMSPermGenSweepingEnabled -XX:+CMSClassUnloadingEnabled -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/opt/logs/spring-boot/dump"
log_dir="/opt/logs/spring-boot"
log_file="/opt/logs/spring-boot/all.log"
jar_file="/opt/app/spring-boot/hera.jar"
# Если папка логов отсутствует, создаём её
if [ ! -d "${log_dir}" ]; then
echo "Создание папки логов: ${log_dir}"
mkdir -p "${log_dir}"
echo "Папка логов создана: ${log_dir}"
fi
```# Проверка наличия JAR файла в родительской директории
```bash
if [ -f "${jar_file}" ]; then
# Запуск JAR файла с перенаправлением вывода ошибок и стандартного вывода
nohup java $JAVA_OPTS -jar ${jar_file} 1>"${log_file}" 2>"${log_dir}"/error.log &
echo "Запуск завершён"
exit 0
else
echo -e "\033[31mФайл JAR ${jar_file} отсутствует!\033[0m"
exit 1
fi
```## Тестирование
Теперь вы успешно вошли. Далее вам потребуется добавить IP адреса машин, выполняющих задачи, в раздел управления `worker`. Выберите группу машин (группа — это понятие, которое позволяет различать различные окружения для разных `workers`, например, некоторые могут использоваться для выполнения задач Spark, другие для Hadoop и так далее).
Для тестирования можно использовать `master` как IP адрес машины, выполняющей задачи, но в рабочей среде рекомендуется не использовать `master` для выполнения задач. Убедитесь, что ваш `worker` имеет клиентскую часть для нужных кластеров.
Добавьте адреса `worker` и группы машин в соответствующий раздел управления.
Существует 30-минутное время ожидания до того момента, когда `master` обнаружит новый `worker`. Для быстрого тестирования можно перезапустить `master`.
После перезапуска войдите в центр управления и введите `1` в поле поиска, затем нажмите Enter.
Вы должны заметить задачу echoTest. В данный момент вы не можете запустить эту задачу, поскольку все ваши задачи требуют авторизации пользователя. Например, если вы используете учётную запись `hera`, то на всех ваших `worker` машинах должна существовать учётная запись `hera`.
Иначе при попытке запуска задачи будет возникать ошибка `sudo: неизвестный пользователь: hera`.
в это время можно добавить пользователя `hera` на машину `work`, которую мы используем:```bash
useradd hera
Если система — это mac
, то можно использовать следующие команды для создания пользователя hera
:
sudo dscl . -create /Users/herа
sudo dscl . -create /Users/herа UserShell /bin/bash
sudo dscl . -create /Users/herа RealName "распределенная задача планирования Hera"
sudo dscl . -create /Users/herа UniqueID "1024"
sudo dscl . -create /Users/herа PrimaryGroupID 80
sudo dscl . -create /Users/herа NFSHomeDirectory /Users/herа
В это время нажмите "Ручной запуск" -> выберите версию -> выполните. В это время задача будет выполняться, нажмите кнопку "Просмотреть лог", чтобы увидеть записи выполнения задач.
Если задача выполнена с ошибками, содержание лога ошибок будет таким:
sudo: no tty present and no askpass program specified
Тогда в это время вам следует обеспечить, что пользователь, который запускает проект hera
, имеет права доступа sudo -u hera
(не требуется ввод пароля от root
, чтобы выполнить sudo -u hera echo 1
). Это можно сделать через конфигурацию файла sudo visudo
.
Например, если пользователь, который запускает hera
, является wyr
, вы должны сначала выполнить команду sudo visudo
в терминале, затем добавить следующую строку:
wyr ALL=(ALL) NOPASSWD:ALL
Как показано на следующем рисунке:
Таким образом, при переходе между пользователями не потребуется ввод пароля. Конечно, если вы используете пользователя root
для запуска, вы можете пропустить этот шаг.Если все настройки завершены успешно, вы сможете видеть лог выполнения задачи.
На данном этапе задача была выполнена вручную.
Разработочная центральная система, как следует из названия, служит местом, где мы проводим разработку (хотя мы также можем добавлять задачи прямо в распределительную систему, рекомендуется сначала протестировать задачи в разработочной системе, а затем уже добавлять их в распределительную систему).
***Адрес проекта: git@github.com:scxwhite/hera.git ***
Как показано на рисунке, разработочная система состоит из двух папок — личные документы
и общие документы
. Эти две папки нельзя удалять.

При этом ниже в области редактирования будет отображен журнал текущего выполняющегося задания.

Кроме того, вы можете просмотреть все журналы, кликнув на "Исторический журнал" внизу.

Можно выбрать строки кода, которые вы хотите выполнить, используя мышь в области редактирования, а затем нажать "Выполнить выбранный код".
## Загрузка ресурсов
Если вам нужно загрузить ресурсы (py
, jar
, sql
, hive
, sh
, js
, txt
, png
, jpg
, gif
и т.д.), обратите внимание, что ваш master
и worker
должны иметь среду Hadoop
, которая может выполнять команду hadoop fs -copyFromLocal
.
После загрузки ресурсов.
В верхней части будет указан адрес использования этого файла ресурсов.
/hera/hdfs-upload-dir/hera-20181229-110455.sql
Если вы используете команду spark-submit
или hive udf add jar
, просто добавьте путь hadoop
. Пример:
add jar hdfs:///hera/hive_custom_udf/2dfire-hivemr-log.jar;
или:
spark2-submit --class com.dfire.start.App \
--jars hdfs:///spark-jars/common/binlog-hbase-1.1.jar \
Конечно, если это какие-то Python скрипты или txt файлы, которые нужно скачать и выполнить, то используется следующий командный синтаксис:
download[hdfs:///hera/hdfs-upload-dir/hera-20181229-110455.sql hera.sql]
Запуск download
как специализированной команды для hera
. Внутри [ ]
содержится две части, разделённые пробелом. Левая часть от пробела — путь к файлу в HDFS, правая часть — переименованное имя файла после скачивания.
Синхронизация задач ещё не реализована.
При работе со скриптами в центре разработки они автоматически сохраняются. Однако можно также использовать кнопку "Сохранить" для ручного сохранения.
Для использования Spark требуется самостоятельно выбрать версию Spark и установить её на worker'ах. Конкретные методы установки и примеры будут предоставлены позже.
You can comment after Login
Inappropriate content may be displayed here and will not be shown on the page. You can check and modify it through the relevant editing function
If you confirm that the content does not involve inappropriate language/advertisement redirection/violence/vulgar pornography/infringement/piracy/false/insignificant or illegal content related to national laws and regulations, you can click submit to make an appeal, and we will handle it as soon as possible.
Comments ( 0 )