Классика баз данных - статьи

         

DBInputFormat


Основная идея состоит в том, что программист MapReduce через класс DBInputFormat представляет SQL-запрос. Последующее выполнение производится реализацией DBInputFormat и является прозрачным для программиста MapReduce. Класс DBInputFormat ассоциирует некоторый модифицированный SQL-запрос с каждым Mapper'ом, запущенным Hadoop. Затем каждый Mapper посылает запрос в СУБД через стандартный драйвер JDBC, получает в ответ часть результатов запроса и в параллель с другими Mapper'ами обрабатывает результаты. Подход DBInputFormat является корректным, поскольку объединение всех запросов, посылаемых всеми Mapper'ами, эквивалентно исходному SQL-запросу.

В подходе DBInputFormat обеспечиваются два интерфейса для обеспечения прямого доступа программам MapReduce к данным из СУБД. Мы посмотрели на исходный код реализации подхода DBInputFormat. Основная реализация является одной и той же для обоих интерфейсов. Эту реализацию можно резюмировать следующим образом. В первом интерфейсе программа MapReduce обеспечивает имя пользователя, пароль и URL СУБД, а также имя таблицы T, список P имен столбцов, которые следует выбрать, необязательные фильтрующие условия C и список имен столбцов O для использования в разделе ORDER BY. Реализация DBInputFormat сначала генерирует запрос "SELECT count(*) from T where C" и посылает его в СУБД для получения числа строк (R) в таблице T. Во время выполнения реализация DBInputFormat знает число Mapper'ов (M), запущенных Hadoop (это число либо обеспечивается пользователем в командной строке, либо берется из конфигурационного файла Hadoop) и ассоциирует с каждым Mapper'ом следующий запрос Q. Каждый Mapper подключается к СУБД, посылает Q через JDBC-подключение и получает результаты.

SELECT P FROM T WHERE C ORDER BY O LIMIT L

OFFSET X (Q)

При получении запроса Q СУБД реально выполняет запрос SELECT P FROM T WHERE C ORDER BY O, но возвращаются только L строк результата со смещением X. M запросов, посылаемых в СУБД M Mapper'ами, являются почти идентичными, за исключением того, что значения L и X в них различны.
Для i-го Mapper'а (где 1 ≤ i ≤ M - 1), который не является последним Mapper'ом, L = ⌊R / M⌋, и X = (i - 1) × ⌊R / M⌋. Для последнего Mapper'а L = R - (M - 1) × ⌊R / M⌋, и X = (M - 1) × ⌊R / M⌋.

Во втором интерфейсе класса DBInputFormat программа MapReduce может предоставить произвольный SQL-запрос SQ на выборку данных, результаты которого являются входными данными для Mapper'ов. В этом случае программа MapReduce должна предоставить и запрос со счетчиком (count query) QC, который должен возвращать целочисленное значение, являющееся числом строк в результате запроса SQ. Класс DBInputFormat посылает в СУБД запрос QC, чтобы получить число строк (R), а дальнейшая обработка – та же самая, что и в первом интерфейсе.

Хотя понятно, что подход DBInputFormat, обеспечиваемый компанией Claudera, упрощает процесс доступа к реляционным данным, он не обеспечивает должного роста производительности при увеличении числа Mapper'ов. С подходом DBInputFormat связано несколько проблем производительности. В обоих интерфейсах каждый Mapper для получения своего поднабора реляционных данных посылает в СУБД, по существу, один и тот же запрос, только с разными значениями в разделах LIMIT и OFFSET. Требуются и указываются программой MapReduce столбцы упорядочивания, которое используется для корректного разделения результатов запроса между всеми Mapper'ами, даже если самой программе MapReduce не нужны отсортированные входные данные. За счет этого достигается параллельность обработки реляционных данных Mapper'ами. СУБД приходится выполнять столько запросов, сколько Mapper'ов имеется в системе Hadoop, и, конечно, это не эффективно, особенно, если число Mapper'ов велико.

Отмеченные проблемы производительности особенно серьезны для параллельной СУБД, в которой, как правило, имеются много одновременно выполняемых запросов и крупные наборы данных. Кроме того, требуемое упорядочивание/сортировка – это дорогостоящая операция в параллельных СУБД, поскольку строки таблицы не сохраняются в каком-либо одном узле, и для сортировки требуется перераспределение строк по узлам.


Доступ к данным Hadoop из SQL с использованием табличной UDF


В этом разделе мы опишем, как можно обеспечить прямой доступ к данным Hadoop через SQL-запросы и использовать эти данные совместно с реляционными данными Teradata EDW для выполнения интегрированного анализа данных. Мы обеспечиваем табличную UDF (User Defined Function – функцию, определяемую пользователями), называемую HDFSUDF, которая "вытягивает" данные из Hadoop в Teradata EDW. Например, в следующем SQL-запросе вызывается HDFSUDF для загрузки данных из файла Hadoop с именем mydfsfile.txt в таблицу Tab1 в Teradata EDW:

INSERT INTO Tab1 SELECT * FROM TABLE(HDFSUDF (‘mydfsfile.txt’)) AS T1;

Заметим, что после создания табличной UDF HDFSUDF и предоставления ее пользователям она вызывается подобно любой другой UDF. Для пользователей этой табличной UDF несущественно, каким образом данные перемещаются из Hadoop в Teradata EDW. Обычно табличная UDF HDFSUDF пишется таким образом, чтобы при ее вызове из SQL-запроса она выполнялась в каждом AMP. Однако ее можно написать и таким образом, чтобы при вызове из SQL-запроса она выполнялась в каком-либо одном AMP или в какой-либо группе AMP. Каждый экземпляр HDFSUDF, выполняемый в некотором AMP, отвечает за извлечение некоторой части файла Hadoop. Табличная функция HDFSUDF может также производить фильтрацию и преобразование данных по мере того, как эта функция доставляет строки в процессор SQL. Примерный код HDFSUDF и другие подробности доступны на Web-сайте Teradata Developer Exchange [1]. Когда в некотором AMP запускается экземпляр UDF, этот экземпляр связывается с NameNode в Hadoop, который заведует метаданными относительно mydfsfile.txt. Метаданные Hadoop NameNode включают информацию о том, какие блоки файла Hadoop сохраняются, и в каких узлах они реплицируются. В нашем примере каждый экземпляр UDF обращается к NameNode и обнаруживает общий размер S файла mydfsfile.txt. Затем табличная UDF запрашивает у Teradata EDW номер своего собственного AMP и общее число AMP. На основе этих фактов каждый экземпляр UDF вычисляет смещение в файле mydfsfile.txt, от которого он начнет читать данные из Hadoop.


Для любого запроса от экземпляров UDF к системе Hadoop NameNode устанавливает, какие DataNode в Hadoop отвечают на возврат требуемых данных. Экземпляр табличной UDF, выполяемый на некотором AMP, получает данные непосредственно от тех DataNode, которые сохраняют требуемые блоки данных. Заметим, что никакие данные из файла Hadoop никогда не маршрутизируются через NameNote. Все это делается напрямую от одного узла другому узлу. В нашей примерной реализации [1] мы просто вынуждаем N-ый AMP в системе загружать N-ую порцию файла Hadoop. В зависимости от потребностей приложений можно обеспечить другие типы отображений.

При принятии решения о том, какую часть файла следует загружать каждому AMD с использованием табличной UDF, нужно убедиться, что, в конечном счете, все экземпляры UDF прочитают все байты файла Hadoop, и каждый байт будет прочитан только один раз. Поскольку каждый AMP запрашивает данные из Hadoop, посылая в своем запросе смещение в байтах до позиции файла, с которого должно начаться чтение, нам требуется гаантировать, что последняя строка, прочитанная каждым AMP, является полной, а не частичной строкой (если экземпляры UDF обрабатывают входной файл в режиме "строка за строкой"). В нашей примерной реализации [1] у файла Hadoop, который требуется загрузить, строки имеют фиксированный размер; поэтому мы можем простым образом вычислить начальное и конечное смещение в байтах требуемой порции данных для любого AMP. В зависимости от формата входного файла и потребностей приложений назначению каждому AMP соответствующей порции файла может потребоваться уделять более серьезное внимание.

После загрузки данных Hadoop в Teradata мы можем анализировать набор данных Hadoop точно так же как любые другие данные, сохраняемые в EDW. Более интересно то, что мы можем выполнять интегрированный анализ данных над реляционными данными, хранимыми в Teradata EDW, и внешними данными, исходно сохранявшимися в Hadoop, без потребности в создании новой таблицы и загрузке в нее данных Hadoop.




Это демонстрируется в следующем примере. Предположим, что у некоторой телекоммуникационной компании имеется файл Hadoop packets.txt, в котором сохраняется информация о сетевых пакетах, и строки которого имеют формат <source-id, dest-id, timestamp>. Поля source-id и dest-id используются для обнаружения спамеров и хакеров. Их значения говорят нам, кто и куда послал запрос. Допустим теперь, что в Teradata EDW имеется таблица watch-list ("список отслеживания"), в которой сохраняется список source-id, которые отслеживаются и используются для анализа тенденций изменения. В следующем SQL-запросе соединяются файл Hadoop packets.txt и таблица watch-list для нахождения списка source-id в таблице watch-list, из которых рассылались пакеты в более чем миллион уникальных dest-id.

SELECT watchlist.source-id, count(distinct(T.dest-id)) as Total FROM watchlist, TABLE(HDFSUDF(’packets.txt’)) AS T WHERE watchlist.source-id=T.source-id GROUP BY watchlist.source-id HAVING Total > 1000000

Приведенный пример показывает, что мы можем использовать подход табличной UDF для обеспечения простой возможности выполнения сложного анализа с применением процессора SQL над данными Hadoop и реляционными данными. В настоящее время мы работаем над более развитой версией HDFSUDF [1], позволяющей пользователям SQL объявлять отображение схем между файлами Hadoop и таблицами SQL, а также фильтровать и трансформировать данные с применением высокоуровневых конструкций SQL без потребности в написании кода на языке Java.


Интеграция Hadoop и параллельной СУБД


Ю Ксу, Пекка Костамаа, Лайк Гао
Перевод: Сергей Кузнецов


Ю Ксу, Пекка Костамаа, Лайк Гао
Перевод: Сергей Кузнецов




Ю Ксу, Пекка Костамаа, Лайк Гао
Перевод: Сергей Кузнецов



Оригинал: Yu Xu, Pekka Kostamaa, Like Gao. Integrating Hadoop and Parallel DBMS. Proceedings of the 2010 International Conference on Management of Data (SIGMOD 2010), June 6-11, 2010, Indianapolis, Indiana, USA, pp. 969-974

This translation is a derivative of ACM-copyrighted material. ACM did not prepare this translation and does not guarantee that it is an accurate copy of the originally published work. The original intellectual property contained in this work remains the property of ACM.



От переводчика: теперь и Teradata...


Честно скажу, я не в восторге от статьи, перевод которой вам предлагается. Она написана явно людьми "от сохи", технарями известнейшей компании, которые не балуют себя частым написанием исследовательских статей. Статья написана, мягко говоря, посредственно, в ней отсутствует описание экспериментов и т.д. Почему же я взялся за ее перевод?

Тому две причины. Во-первых, это для меня первая статья, касающаяся использования MapReduce в продукте компании, которая первой выпустила на рынок массивно-параллельную СУБД, пользующуюся мировым успехом на протяжении десятилетий. Компания Teradata для меня является большим авторитетом в области параллельных аналитических СУБД, и статьей о работах по интеграции с MapReduce, выполняемых в этой компании, я пренебречь просто не мог.

Во-вторых, мною двигала и чисто коллекционерская цель. За 2009-2010 гг. годы я прочитал и перевел несколько хороших статей, посвященных скрещиванию технологий MapReduce и массивно-параллельных баз данных:

Эндрю Павло, Эрик Паулсон, Александр Разин, Дэниэль Абади, Дэвид Девитт, Сэмюэль Мэдден, Майкл Стоунбрейкер.

Майкл Стоунбрейкер, Дэниэль Абади, Дэвит Девитт, Сэм Мэдден, Эрик Паулсон, Эндрю Павло и Александр Разин.

Джеффри Коэн, Брайен Долэн, Марк Данлэп, Джозеф Хеллерстейн, Кейлэб Велтон.

Эрик Фридман, Питер Павловски и Джон Кислевич.

Азза Абузейд, Камил Байда-Павликовски, Дэниэль Абади, Ави Зильбершац, Александр Разин. .

Я написал свою собственную обзорную статью . Эта тема продолжает оставаться для меня очень интересной, и я стараюсь не пропускать статей, которые ее как-нибудь затрагивают. А статья Ю Ксу и др., конечно, этой темы непосредственно касается. Авторы идут по пути, близкому пути Vertica. Они не пытаются скрестить Teradata с MapReduce, засунув одно в другое (для Teradata такие потрясения вряд ли допустимы), а предлагают механизмы для плодотворного сосуществования: средства разного рода экспорта данных из среды Hadoop в среду Teradata и наоборот.

Думаю, что статья будет, безусловно, интересна для пользователей Teradata (которых, насколько я знаю, в России и соседствующих с ней странах не так уж много), а также для всех специалистов и просто любознательных людей, которых интересуют перспективы систем управления аналитическими данными.



Параллельная загрузка данных Hadoop в Teradata EDW


В этом разделе мы представляем подход DirectLoad, который мы разработали для эффективной параллельной загрузки данных Hadoop в Teradata EDW. Сначала мы кратко описываем утилиту/протокол FastLoad [2], широко используемую в производственных условиях для загрузки данных в таблицы Teradata EDW. Клиент FastLoad, прежде всего, подключается к процессу Gateway, выполняющемуся в одном из узлов системы Teradata EDW, которая представляет из себя кластер узлов. Клиент FastLoad образует столько сессий, сколько указывается пользователем Teradata EDW. Каждый узел в системе Teradata EDW конфигурируется таким образом, что в нем выполняется несколько виртуальных параллельных компонентов, называемых AMP (Access Module Processor – процессор модуля доступ) [2]. В Teradate AMP является единицей параллелизма; он отвечает за выполнение сканирования, соединений и других задач управления данными над данными, которыми он управляет. Каждая сессия управляется одним AMP, и число сессий, образуемых клиентом FastLoad, Teradata EDW не может превосходить число AMP. Программное обеспечение Teradata Gateway является интерфейсом между Teradata EDW и клиентами, подключенными к сети. Процессы Teradata Gateway обеспечивают коммуникации и управляют ими, а также сообщениями клиентов и шифрованием.

После образования сессий клиент FastLoad посылает пакеты строк в подключенный процесс Gateway, адресуя их в циклическом стиле этим сессиям. Gateway перенаправляет строки в AMP-получатель, ответственный за сессию, которой адресованы эти строки, а затем AMP-получатель вычисляет для каждой строки значение хэш-функции (это значение вычисляется с использованием системной хэш-функции на столбце первичного индекса, задаваемой создателем таблиц или выбираемой автоматически системой баз данных). На основе вычисленных хэш-значений AMP-получатель посылает полученные им строки соответствующим целевым AMP, которые будут хранить эти строки в Teradata EDW. Для каждой строки, посылаемой клиентом FastLoad, AMP-получатель и Gateway могут располагаться в разных узлах.
Целевой AMP и AMP- получатель могут быть разными AMP и также могут выполняться в разных узлах. В действительности, для большинства строк, посылаемых клиентом FastLoad с использованием нескольких сессий, Gateway и AMP-получатель выполняются в разных узлах, и AMP-получатель и целевой AMP также выполняются в разных узлах.

При загрузке в Teradata EDW разделенного файла DFS, сохраняемого в нескольких узлах Hadoop, возникают возможности оптимизации, которые отсутствуют при использовании СУБД, выполняемой в одном SMP-узле, или традиционного подхода FastLoad. Основная идея нашего подхода DirectLoad состоит в устранении двух пересылок данных, присутствующих в существующем подходе FastLoad. Первая пересылка выполняется от процесса Gateway к AMP-получателю, а вторая – от AMP-получателя к целевому AMP. В нашем подходе DirectLoad клиенту разрешается посылать данные в любой AMP-получатель, указываемый клиентом DirectLoad (в отличие от циклического подхода, реализованного в FastLoad). Поэтому мы можем устранить пересылку от Gateway к AMP-получателю за счет использования только AMP-получателей в том же узле, к которому подключен клиент DirectLoad.

Для описания того, как работает подход DirectLoad, мы используем следующий простейший случай. Сначала мы решаем, какую часть файла Hadoop должен получить каждый AMD, а затем образуем столько заданий DirectLoad, сколько AMD имеется в Teradata EDW. Каждое задание DirectLoad подключается к некоторому процессу Gateway, читает назначенную ему часть файла Hadoop с использованием API Hadoop, и пересылает данные подключенному процессу Gateway, который посылает данные Hadoop только одному уникальному AMP в том же узле Teradata. Так можно сделать, потому что каждому заданию DirectLoad известно, к какому процессу Gateway/узлу он подключен, и он может попросить Teradata EDW обнаружить список AMD, поддерживаемых в том же узле.

Поскольку нас более всего интересует быстрая пересылка данных из Hadoop в Teradata EDW, мы делаем каждый AMD-получатель целевым AMD, управляющим полученными им строками.


Таким образом, вычислять значения хэш- функции на строках не требуется, и вторая пересылка в подходе DirectLoad устраняется. Однако при этом мы поступаемся тем, что над загружаемыми данными Hadoop не строится какой-либо индекс. Задания DirectLoad можно сконфигурировать таким образом, чтобы они выполнялись в системе Hadoop или же в системе Teradata EDW. Мы опускаем здесь обсуждение того случая, когда пользователю не угодно запускать столько заданий DirectLoad, сколько имеется AMP.

Наши предварительные эксперименты показывают, что DirectLoad может существенно превзойти FastLoad по производительности. В тестовой системе, которую мы использовали для экспериментов, имелось 8 узлов. В каждом узле имелось 4 процессора Pentium IV 3.6 GHz, 4 гигабайта основной памяти и два устройства с жесткими дисками, выделенных для использования в Teradata. Два других дисковых устройства предназначались для использования операционной системой и системой Hadoop (версия 0.20.1). В одной и той же тестовой системе функционировали и Teradata EDW, и Hadoop. В каждом узле запускались два AMP, чтобы можно было с пользой применять оба дисковых устройства, выделенных для целей Teradata.

Мы выполнили два эксперимента. В обоих экспериментах в одном задании FastLoad для загрузки данных Hadoop в Teradata EDW использовались 16 сессий. В данной системе максимальное число сессий, которое могло бы иметь задание FastLoad, равняется 16, посколько имеется всего 16 AMP. В подходе DirectLoad имелось по два задания DirectLoad на один узел, и в каждом задании DirectLoad использовалась одна сессия для посылки данных в локальный AMD. В обоих экспериментах в подходе DirectLoad одновременно имелось 16 активных сессий. В первом эксперименте мы генерировали DFS-файл с одним миллиардом строк. В каждой строке имелось два столбца. Во втором эксперименте мы генерировали DFS-файл со 150 миллионами строк. В каждой строке имелось 20 столбцов. Все столбцы были целого типа. В обоих экспериментах подход DirectLoad оказался примерно в 2,1 раза быстрее подхода FastLoad.Мы планируем выполнить большее число экспериментов при других конфигурациях системы.


Родственные работы


MapReduce вызывает огромный интерес как в индустрии, так и в академических кругах. Одно из направлений исследований состоит в повышении мощности или выразительности модели программирования MapReduce. В [19] предлагается добавить к набору примитивов MapReduce новый примитив MERGE, чтобы облегчить выполнение соединений в среде MapReduce, поскольку в MapReduce реализация соединений затруднительна. Pig Latin [14, 9] – это новый язык, разработанный Yahoo! в качестве "золотой середины" между декларативным стилем SQL и низкоуровневым процедурным стилем MapReduce. В проекте Facebook Hive [17] – разрабатывается решение с открытыми исходными кодами для построения хранилищ данных на основе Hadoop. В Hive обеспечивается SQL-подобный декларативный язык HiveQL, который компилируется в задания MapReduce, выполняемые в Hadoop.

В то время как [14, 9, 17, 4] помогают интегрировать конструкции декларативных запросов из РСУБД в MapReduce-подобную среду программирования для поддержки автоматической оптимизации запросов, достижения более высокой продуктивности программирования и большей выразительности запросов, другое направление исследований состоит в том, что исследователи и производители программных продуктов управления базами данных пытаются внедрить лучшие черты MapReduce, включая дружественность по отношению к пользователям и отказоустойчивость, в реляционные базы данных. HadoopDB [3] – это гибридная система, целью разработчиков которой является объединение лучших черт Hadoop и РСУБД. Основная идея HadoopDB состоит в соединении нескольких одноузловых систем баз данных (PostgreSQL) с использованием Hadoop в качестве координатора задач и уровня сетевых коммуникаций. Greenplum и Aster Data позволяют пользователям писать функции в стиле MapReduce над данными, хранимыми под управлением их параллельных систем [12].

Родственной работой по отношению к описанному в разд. 3 подходу TeradataInputFormat является реализация VerticaInputFormat компании Vertica [18], обеспечивающая программам MapReduce прямой доступ к реляционным данным, хранимым в параллельной СУБД Vertica (эта реализация также инспирирована DBInputFormat [7], но не основана на реализации данного интерфейса).
Однако в реализации Vertica (как и в реализации DBInputFormat) в СУБД посылается столько SQL-запросов ( в каждом из которых, как и в подходе DBInputFormat, к SQL-запросу, предоставленному пользователем, добавляется один раздел LIMIT и один раздел OFFSET), сколько имеется Mapper'ов в Hadoop, хотя каждый Mapper случайным образом выбирает для подключения узел кластера Vertica.

В нашем подходе TeradataInputFormat каждый Mapper также случайным образом подключается к узлу Teradata EDW. Однако, по нашему опыту, это не приводит к существенному повышению производительности программ MapReduce, поскольку все запросы параллельно выполняются во всех узлах, независимо от того, в какой узел посылался конкретный запрос. Ключевым фактором высокой производительности подхода TeradataInputFormat является то, что специфицируемые пользователями запросы выполняются только один раз, а не столько раз, сколько имеется Mapper'ов, как это происходит в DBInputFormat и VerticaInputFormat.

Дополнительным (не всегда применимым) оптимизационным приемом в VerticaInputFormat является то, что когда пользователь задает параметризованный SQL-запрос типа “SELECT * FROM T WHERE c=?”, в VerticaInputFormat поддерживается список значений параметра для разных Mapper'ов, и значения параметра обеспечиваются пользователем во время выполнения. И опять же число SQL-запросов, посылаемых в кластер Vertica, совпадает с числом Mapper'ов.


TeradataInputFormat


Основная идея нашего подхода заключается в том, что коннектор Teradata для Hadoop (TeradataInputFormat) посылает в Teradata EDW SQL-запрос Q, обеспечиваемый программой MapReduce, только один раз, и результаты сохраняются в некоторой PPI-таблице T (PPI – Partitioned Primary Index, разделенный первичный индекс). После этого каждый Mapper системы Hadoop посылает новый запрос Qi, в котором всего лишь в каждом AMP запрашивается i-ый раздел.

Обсудим теперь нашу реализацию более подробно. Прежде всего, класс TeradataInputFormat посылает в Teradata EDW следующий запрос P, основанный на запросе Q, который предоставляется программой MapReduce.

CREATE TABLE T AS (Q) WITH DATA PRIMARY INDEX ( c1 ) PARTITION BY (c2 MOD M) + 1 (P)

В этом запросе требуется, чтобы в Teradata EDW был выполнен запрос Q, и чтобы результаты были сохранены в новой PPI-таблице T. Хэш-значение столбца первичного индекса c1 каждой строки результата запроса определяет, в каком AMP должна храниться эта строка. После этого значение выражения, указанного в разделе Partition By, определяет физический раздел (местоположение) каждой строки в конкретном AMP. В одном AMP все строки с одним и тем же значением выражения Partition By физически хранятся совместно и могут быть напрямую и эффективно найдены Teradata EDW. Мы опустим детали того, каким образом мы автоматически выбираем столбец первичного индекса и выражение Partition By. После выполнения запроса Q и создания таблицы T в каждом AMP имеется M разделов с номерами от 1 до M (M – число Mapper'ов, запущенных в Hadoop). В качестве одного из дополнительных вариантов мы думаем разрешить опытным программистам самим задавать выражение Partition By через интерфейс TeradataInputFormat, чтобы получить более тонкое программное управление над тем, как следует разделять результаты запросов (конечно, это возможно, только если программистам хорошо известна демография данных).

Затем каждый Mapper посылает в Teradata EDW следующий запрос Qi (1 ≤ i ≤ M):


SELECT * FROM T WHERE PARTITION = i (Qi)

Teradata EDW напрямую параллельно определит местоположение всех строк i-го раздела каждого AMP и вернет эти строки i-му Mapper'у. Эта операция выполняется параллельно для всех Mapper'ов. После того как все Mapper'ы получат свои данные, таблица T удаляется.

Заметим, что если в исходном SQL-запросе всего лишь выбираются данные из базовой таблицы, которая является PPI-таблицей, то мы не создаем еще одну PPI-таблицу (T), поскольку можем непосредственно использовать существующие разделы для разделения данных, которые должен получить каждый Mapper.

В настоящее время у PPI-таблицы в Teradata EDW должен иметься столбец первичного индекса. Поэтому при вычислении запроса P системе Teradata EDW требуется разделить результаты запроса между всеми AMP в соответствии со значениями столбца первичного индекса. Одной из возможных в будущем оптимизаций является параллельное построение разделов результатов запроса в каждом AMP без перемещения результатов SQL-запроса Q между AMP. Еще одна возможная оптимизация состоит в том, что для построения M разделов нам в действительности не требуется сортировать строки в каком-либо AMP на основе значений выражения Partition By. Для наших целей мы может использовать здесь "номера псевдоразделов": первой 1/M-части строк результата запроса в любом AMP можно назначить номер раздела 1, ..., последней 1/M-части строк результата запроса в любом AMP можно назначить номер раздела M.

Заметим, что данные, выбираемые программой MapReduce через интерфейс TeradataInputFormat, не сохраняются в Hadoop после завершения программы MapReduce (если только их не сохранит сама программа MapReduce). Поэтому, если какие-то данные Teradata EDW часто используются многими программами MapReduce, более эффективно будет скопировать эти данные и материализовать их в Hadoop в виде файлов Hadoop DFS.

В зависимости от числа Mapper'ов, сложности SQL-запроса, предоставляемого программой MapReduce, и объема данных, затрагиваемых этим SQL-запросом, производительность подхода TeradataInputFormat, очевидным образом, может на порядки величин превышать производительность подхода DBInputFormat, что подтверждается предварительными результатами тестирования.



Подход TeradataInputFormat, описанный в этом подразделе, можно назвать подходом, основанным на горизонтальном разделении, в том смысле, что каждый Mapper выбирает часть результатов запроса из каждого AMP (узла). В настоящее время мы исследуем подход, основанный на вертикальном разделении, когда несколько Mapper'ов выбирают данные только из одного AMP при M > A (M – число Mapper'ов, запущенных Hadoop, и A – число AMP в Teradata EDW), или когда каждый Mapper выбирает данные из некоторого подмножества AMP при M < A, или когда каждый Mapper выбирает данные из одного и только одного AMP при M = A. Для реализации подхода, основанного на вертикальном разделении, в текущем варианте Teradata EDW требуется больше изменений, чем для реализации подхода, основанного на горизонтальном разделении. Мы предполагаем, что производительность любого из этих подходов не всегда будет превосходить производительность другого подхода.


Distributed File System, DFS) широко


Распределенные файловые системы ( Distributed File System, DFS) широко используются в поисковых системах для хранения огромного объема данных, собираемых в Internet, поскольку DFS обеспечивают масштабируемое, надежное и экономичное решение хранения данных. Компании, специализирующиеся на разработке поисковых систем, также создают на основе DFS параллельные вычислительные платформы для параллельного выполнения крупномасштабного анализа данных, сохраняемых в DFS. Например, у Google имеются GFS [10] и MapReduce [8]. Yahoo! использует Hadoop [11] – реализацию с открытыми исходными текстами, выполненную Apache Software Foundation и основанную на GFS и MapReduce компании Google. Компания Ask.com построила Neptune [5]. У Microsoft имеются Dryad [13] и Scope [4].
Hadoop привлекает внимание большого сообщества пользователей по причине открытости кодов и наличия серьезной поддержки со стороны Yahoo!. В Hadoop файлы разбиваются на блоки, и каждый блок несколько раз реплицируется в разных узлах для обеспечения отказоустойчивости и распараллеливания вычислений. Обычно Hadoop выполняется в кластерах, построенных на основе недорогой аппаратуры массового спроса. Hadoop легко устанавливается, и системой просто управлять. Загрузка данных в DFS производится более эффективно, чем в параллельную СУБД [15].
Текущая тенденция состоит в том, что компании начинают использовать Hadoop для крупномасштабного анализа данных. Хотя для начала использования Hadoop требуются совсем небольшие расходы, обычно Hadoop MapReduce значительно уступает параллельным СУБД в производительности: Hadoop в 2-3 раза медленнее, чем параллельная СУБД, решает простейшую задачу подсчета числа вхождений разных слов в файле/таблице, и в десятки раз медленнее справляется с более сложными задачами анализа данных [15]. Кроме того, программы MapReduce для сложного анализа данных пишутся гораздо дольше, чем соответствующие SQL-запросы. Нам известно, что одна из крупных Internet-компаний, имеющая крупные кластеры с Hadoop, переходит к использованию параллельной СУБД для производства некоторых наиболее сложных аналитических отчетов, поскольку руководители компании не удовлетворены тем, что в обстановке постоянно изменяющихся и усложняющихся бизнес-требований им приходится ждать по несколько дней, пока будут написаны и отлажены требуемые сложные программы MapReduce.


С другой стороны, из-за того, что в вычислительных центрах некоторых заказчиков Teradata в последние годы наблюдается быстрый рост объемов данных, некоторые данные, такие как Web-журналы, детальные данные об обращениях клиентов, сенсорные данные и данные RFID не управляются Teradata EDW. Частично это связано с очень высокой стоимостью загрузки этих исключительно объемных данных в РСУБД, особенно, если эти данные не слишком часто используются для поддержки принятия важных бизнес-решений.
Некоторые заказчики Teradata для хранения своих исключительно объемных данных используют DFS, поскольку DFS обеспечивают им ряд преимуществ. Например, одна из основных компаний, специализирующаяся на производстве телекоммуникационного оборудования, планирует протоколировать все действия пользователей по отношению ко всем своим устройствам, и журналы исходно будут сохраняться в DFS, но в конечном счете некоторые или все эти журналы должны будут управляться параллельной СУБД для выполнения над ними сложного бизнес-анализа.
Тем самым, у крупных компаний, имеющих данные, которые сохраняются в DFS и в Teradata EDW, имеется сильная бизнес-потребность в интеграции бизнес-анализа над данными обоих типов. Аналогичным образом, те компании, которые изначала стали использовать низкозатратный подход Hadoop, а теперь нуждаются в использовании параллельной СУБД, подобной Teradata, для обеспечения более высокой производительности и более развитых функциональных возможностей, испытывают насущную потребность в средствах интегрированного анализа данных Hadoop и данных, хранимых в Teradata EDW.
Очевидно, что первым важным шагом, требуемым для интеграции бизнес-анализа над данными, хранимыми в средах Hadoop и Teradata EDW, является обеспечение эффективной пересылки данных между этими средами. Прямолинейный подход, не требующий каких-либо новых разработок ни со стороны Hadoop, ни со стороны Teradata EDW, заключается в использовании имеющихся утилит загрузки и экспорта: файлы Hadoop можно скопировать в обычные файлы, которые можно загрузить в Teradata EDW, а таблицы из Teradata EDW можно экспортировать в файлы, которые можно загрузить в Hadoop (или использовать в потоковом стиле без материализации промежуточных файлов).


Однако одной из общих черт Hadoop и Teradata EDW является то, что данные в обеих системах для обеспечения параллельной обработки разделяются по нескольким узлам, что обеспечивает возможности оптимизации, недоступные для СУБД, выполняющихся в одном узле. В этой статье мы описываем три свои работы, направленные на достижение тесной и эффективной интеграции Hadoop и Teradata EDW.

Мы обеспечиваем утилиту полностью параллельной загрузки, называемую DirectLoad, для эффективной загрузки данных Hadoop в Teradata EDW. Ключевая идея подхода DirectLoad состоит в том, что сначала мы приписываем каждый блок данных файла Hadoop некоторому параллельно компоненту Teradata EDW, а затем напрямую параллельно загружаем данные в параллельные компоненты. Для поддержки подхода Teradata EDW мы также применяем внутри Teradata EDW новые методы для минимизации перемещения данных между узлами.
Мы обеспечиваем коннектор для Hadoop под названием TeradataInputFormat, который позволяет программам MapReduce напрямую читать данные из Teradata EDW через драйверы JDBC без потребности в каких-либо внешних шагах экспортирования данных (из СУБД) и их загрузки в Hadoop. TeradataInputFormat инспирирован подходом DBInputFormat [7], разработанным компанией Cloudera [6], но не основывается на нем. В отличие от подхода DBInputFormat, в котором каждый Mapper посылает в СУБД некоторый бизнес-запрос, представленный на SQL (и, таким образом, этот SQL-запрос выполняется столько раз, сколько имеется Mapper'ов Hadoop), коннектор TeradataInputFormat посылает в Teradata EDW бизнес-запрос только один раз, этот SQL-запрос выполняется только единожды, и каждый Mapper в параллель получает некотрую часть результатов прямо из узлов Teradata EDW.
Мы обеспечиваем табличную UDF (User Defined Function – определяемая пользователями функция), которая при вызове из любого стандартного SQL-запроса выполняется в каждом параллельном компоненте Teradata EDW для параллельной выборки данных Hadoop прямо из узлов Hadoop. Любые реляционные таблицы можно соединить с данными Hadoop, выбираемыми этой табличной UDF, и любое средство бизнес-анализа, обеспечиваемое процессором SQL Teradata, можно применить как к реляционным данным, так и к данным Hadoop.Не требуются какие-либо внешние шаги для экспортирования данных Hadoop и их загрузки в Teradata EDW.
Оставшаяся часть статьи организована следующим образом. В разд. 2, 3 и 4 мы обсуждаем по очереди три вышеупомянутых подхода. В разд. 5 мы обсуждаем родственные работы. Разд. 6 содержит заключение.

Выборка данных EDW в программах MapReduce


В этом разделе мы обсуждаем подход TeradataInputFormat, позволяющий программам MapReduce напрямую читать данные Teradata EDW через драйверы JDBC без потребности в каких-либо внешних шагах экспортирования (из Teradata EDW) и загрузки данных в Hadoop. Прямолинейный подход, обеспечивающий программам MapReduce доступ к реляционным данным, состоит в том, что сначала используется утилита СУБД для экспорта результатов требуемых SQL-запросов в локальный файл, а затем этот локальный файл загружается в Hadoop (или результаты запросов используются в потоковом стиле без потребности в промежуточном файле). Однако программисты MapReduce часто считают более удобным и продуктивным прямой доступ к реляционным данным из своих программ MapReduce без привлечения внешних шагов экспортирования данных из СУБД (для чего требуется знание скриптового языка экпорта данных) и их загрузки в Hadoop.

Признавая потребность интеграции реляционных данных в программах Hadoop MapReduce, компания-стартап Cloudera [6], которая специализируется на коммерциализации продуктов и сервисов, связанных с Hadoop, обеспечивает несколько Java-классов (в основном, DBInputFormat [7]), входящих теперь в основной дстрибутив Hadoop и позволяющих программам MapReduce посылать SQL-запросы через стандартный интерфейс JDBC для параллельного доступа реляционных данных. Поскольку наш подход TeradataInputFormat инспирирован подходом DBInputFormat (но не основывается на нем), мы сначала кратко опишем, как работает подход DBInputFormat, а затем обсудим подход TeradataInputFormat.



с MapReduce, продолжают активно развиваться


Исследования, связанные с MapReduce, продолжают активно развиваться и вызывают интерес как в индустрии, так и в академических кругах. Подход MapReduce особенно интересен для производителей параллельных СУБД, поскольку и в MapReduce, и в РСУБД используются кластеры узлов и масштабируемая технология анализа данных. Крупные заказчики Teradata все чаще сталкиваются с потребностью выполнения интегрированного анализа данных, хранимых и в среде Hadoop, и в Teradata EDW. Мы представили три исследовательские работы, направленные на достижение тесной интеграции Hadoop и Teradata EDW.
Наш подход DirectLoad обеспечивает быструю параллельную загрузку данных Hadoop в Teradata EDW. Наш подход TeradataInputFormat дает программам MapReduce возможность эффективного и прямого параллельного доступа к данным Teradata EDW без потребности во внешних шагах экспортирования и загрузки данных из Teradata EDW в Hadoop. Мы также продемонстрировали, каким образом пользователи SQL могут напрямую обращаться к данным Hadoop и соединять их с данными Teradata EDW с применением определяемых пользователями функций.
Хотя результаты работ, описанных в этой статье, могут удовлетворить потребности большого числа заказчиков Teradata, нуждающихся в совместном использовании данных Hadoop и Teradata EDW в своей среде корпоративного хранилища данных, имеется еще много проблем, над решением которых мы продолжаем работать. Одной из проблем, которые нас более всего интересуют, является возможность переноса большего объема вычислений из Hadoop в Teradata EDW и из Teradata EDW в Hadoop.