Daytona - Iterative MapReduce on Windows Azure
Overview
MapReduce is a framework for processing highly distributable problems across huge datasets using a large number of compute nodes. It is a generic mechanism that comprises 2 steps:
-
Map step: The master node takes the input, partitions it up into smaller sub-problems, and distributes them to worker nodes. The worker node processes the smaller problem, and passes the answer back to its master node.
-
Reduce step: The master node then collects the answers to all the sub-problems and combines them in some way to form the output – the answer to the problem it was originally trying to solve.
Microsoft has developed an iterative MapReduce runtime for Windows Azure, code-named "Daytona.". It can be downloaded here: http://research.microsoft.com/en-us/downloads/cecba376-3d3f-4eaf-bf01-20983857c2b1/default.aspx
Daytona can scale out computations for analysis of distributed data. Developers can use this to construct distributed computing cloud data-analysis services on Windows Azure.
This is Microsoft's implementation of Hadoop; the competing distributed computing infrastructure + API of Dryad + LINQ to HPC is now redundant.
An application has already been developed that leverages this infrastructure - Excel DataScope: an analytics service that exposes a library for cluster analysis, outlier detection, classification, machine learning, and data visualization. Users can upload data in their Excel spreadsheet to the DataScope service or select a data set already in the cloud, and then select an analysis model from the Excel DataScope research ribbon to run against the selected data. Daytona will scale out the model processing to perform the analysis. The results can be returned to the Excel client or remain in the cloud for further processing and/or visualization.
Development steps:
- Develop arbitrary data analytics algorithms as a set of Map and Reduce tasks.
- Upload the data and algorithms library into Windows Azure blob storage.
- Deploy the Daytona runtime to Windows Azure - configure the number of virtual machines and the storage for analysis results.
- From a client, launch the algorithm execution.
Daytona will automatically deploy the iterative MapReduce runtime to all of the configured Azure VMs, sub-dividing the data into smaller chunks so that they can be processed (the “map” function of the algorithm) in parallel. Eventually, it recombines the processed data into the final solution (the “reduce” function of the algorithm). Azure storage serves as the source for the data that is being analyzed and as the output destination for the end results. Once the analytics algorithm has completed, you can retrieve the output from Azure storage or continue processing the output by using other analytics model(s).
Key Properties:
-
Designed for the cloud, specifically for Windows Azure - the scheduling, network communications, and fault tolerance logic leverages Azure fabric infrastructure.
-
Designed for cloud storage services
- a streaming based, data-access layer for cloud data sources (currently, Windows Azure blob storage only), which can partition data dynamically and support parallel reads. Intermediate data can reside in memory or in local non-persistent disks with backups in blobs, so that it is possible to consume data with minimum overheads and with the ability to recover from failures. A distributed file system is not required as the Azure storage services automatic persistence and replication are leveraged.
-
Horizontally scalable and elastic
- Computations are performed in parallel, so that scaling a computation just requires adding more virtual machines to the deployment compute pool and the infrastructure will take care of the rest. Allows focusing on data exploration, instead of concerns about acquiring compute capacity or hardware management.
-
Optimized forIterative, convergent algorithms - supported in the core runtime, which caches data between iterations to reduce communication overheads and provides different scheduling and relaxed fault tolerance mechanisms. This is exposed via an API for authoring iterative algorithms.
Architecture
T
he
Da
yt
ona r
unti
me co
n
si
sts of
two
wor
k
er
r
o
l
es
d
e
pl
o
y
ed
wit
hin
a
si
ng
le
wi
ndo
ws
a
zure
s
er
vic
e.
-
Master
work
er
r
o
l
e - the service will run only a single instance of it.
-
Slave
work
er
r
o
le - the service will run multiple instances of it. The Master will assign one or more tasks to a Slave instance.
A
pplica
tio
ns
w
ill
s
u
b
mit
o
ne
or
more
jo
bs
to
the
Ma
s
t
er.
A
Job
c
on
si
sts
of
one
or
more
t
a
sks
a
nd
a
task can
be
one
of
t
he
follo
wi
ng task types:
-
Ma
p t
a
sk – invokes a user-defined function that processes a KVP to generate a set of intermediate KVPs.
-
R
e
duce
t
a
sk - invokes a user-defined function that processes all values associated with an intermediate key and generates a final set of KVPs.
Te
r
m
i
n
o
logy
-
A
pp
li
catio
n
-
an
a
b
st
rac
tion that contains one or more MapReduce computations composed by the user. An Application is composed of a (1) Package, (2) Controller and (3) Controller Arguments.
-
Pac
ka
ge
- The collectionof assemblies that compose this Application referenced by name.
-
C
ont
r
o
lle
r
- Takes arguments passed in from external client during application submittal and configures, createsand controlsjobs on behalf of this application.
-
J
ob
-
A single MapReduce computation. An Application can contain multiple MapReduce jobs. Each job is defined by a job configuration and a set of parameters.
-
Task
– Map or Reducetasksof a Job.
-
C
lie
nt
I
nt
erface
– An interface that is used to submit applications to the Daytona runtime.
Operational Process Steps:
-
Submit, Partition & Assign - The Masterpicks an application thathasbeensubmitted using the client interface. The Master creates the user-definedController instance of theapplication. The Controller contains the logic for creating and submitting jobs for execution. The runtime will automatically generate theappropriate Map/Reduce tasksper job andschedule themforexecution in theslaves. The Controller will leverage auser-defined DataPartitionertosplit the inputdatasetinto a number of data partitions. Amap taskisassigned toeachdatapartition while the number ofreduce tasks isleft for theuser todefinebasedonthe expected sizeof output generatedby the map tasks. Each slave instance isthen assignedoneormoremap tasksbythe Master.
-
Map & Combine - Aslave instancewhichhasbeen assigned amap task reads the content of the correspondingdata partition,parses it into KVPs through auser-defined Reader. Each KVP is then sequentiallypassed to theuser-definedMapfunctionthat produces a set of intermediate KVPs. The intermediate KVPs are partitioned among the count of reduce tasksspecifiedby the user andserialized in theslave instancesmemory or localfilestorage (Optionally,users canspecify a Combinerto locallymergeintermediatevaluesforeachintermediatekey. Users can also control which keys go to which reduce task by specifying a KeyPartitioner ). The slave instance then notifies the master about completion of the task.
-
Reduce - Assoon as themasterisnotified of the completion of one of themap tasks, itassigns the reduce tasks to the slave instances. A slave which has been assigned a reduce task is provided with the details for downloading its input data (intermediate KVPs generated by map tasks). The dataexchange isdonevia inter-role communication. When the slave has downloaded all its input data, it de-serializes and groups it by the intermediate keys as multiple map tasks could have generated the same key. The keys are thensorted and sequentiallypassed oneby one alongwith theirvalues to theuser-defined Reducefunction. The output of the reduce function isthen persisted through a user-definedwriter.
The runtime uses a data caching layer and an optimizedschedulingmechanism to support iterative MapReducejobs, i.e. execution of the same job in a loop within an iterative MapReduce computation.
API
Alg
orit
hm deployment to
Da
yt
ona
requires
creating
a
Map
f
unct
ion
a
nd
a
R
e
duce
f
unctio
n.
A Co
ntr
oll
er
must be created to handle job configuration, submission and management on the Master node within the Daytona runtime.
Use the MapReduceClient class to interact with the runtime - responsible for packaging the various components, uploading them to the runtime and submitting the application to be run on the backend.
MapReduceClient Class
This class facilitates interacting with the Daytona runtime by providing methods for submitting an application, tracking its execution status. Optionally, this class can receive a set of string properties from the Controller of the application at the end of execution as a means of communicating results.
Controller Class
Is invoked andexecutedper application within the Daytona runtime to manage jobsubmissions onbehalf of its corresponding application. The controller composes the jobs and orchestrates theirflowwithin an application. For example, a controller of a typical MapReduce application may create one or more jobs and executes them one after the other to form aworkflow (or a chain of jobs). Similarly, the controller of an iterative application mayexecute a job in a looping construct. Additionally, it can be used to perform anypre andpostprocessing activities of an application or to evaluate convergence criteria in the case of an iterative application.
JobConfiguration Class
ca
p
t
ur
es
t
he
st
at
ic
co
n
f
ig
urati
on
a
nd
param
ete
rs
of
a
Jo
b. For example, the Map class type, the Reduce class type and other parameters such as the exception policy are configured using this. A single JobConfiguration canbe sharedbetween multiplejobs.
Job Class
ca
pt
ur
es
the
d
ynamic
c
on
figu
ra
t
io
ns
and
p
ara
mete
rs
of
a
job
and
pr
o
vi
des
m
et
ho
ds
for
r
unni
ng
and clo
si
ng
a
jo
b.
CloudClient Class
Provides a serializable wrapper for accessing different cloud clients {CloudBlobClient, CloudTableClient, CloudQueueClient } belonging to a particular storage account.
IDataPartitioner<K,V> & IRecordReader<K,V> Interfaces
Data partitioning provides thefunctionality forsplitting the inputdataset intopartitions aswellasparsing the contents ofeachpartition to generateKVPs. Data partitioning can be provided by implementing the interface IDataPartitioner and each split generated should implement theempty interfaceIDataPartition.
The typesimplementingIDataPartitioner andIDataPartitionshouldbemarked asSerializable.Thisisbecause the Daytona runtime runseach application on adedicatedAppDomain andinstances of these typesneed tobe passedacross AppDomains.
IMapper<KIN, VIN, KOUT, VOUT> Interface
Provides the functionality for processing the KVPs generated from parsing each individual partition. It then generates intermediate KVPs as a result of that processing.
IKeyPartitioner<K,V> Interface
K
ey
parti
tio
ni
ng
pr
o
vi
des
t
he
f
unctio
nali
ty
for
s
plitti
ng/
d
i
stri
buti
ng t
he
i
nte
r
me
dia
te
KVPs
g
e
n
erated after
e
x
ec
uti
ng
of
Ma
p/
C
o
m
bi
ne amo
ng
t
he
r
e
duce
t
a
s
k
s. The count of reduce tasks is provided to the partition through the Partition method.
IReducer<KIN, VIN, KOUT, VOUT> Interface
A
C
o
m
bi
n
er
can p
er
fo
r
m an
o
pti
onal
step
i
m
m
e
dia
t
e
ly aft
er
t
he
Map
so
as
to
r
e
duce
t
he
i
nput
pa
ylo
ad
of
R
e
duce
t
a
sks. Typically, the same implementation is shared for reduce and combine. Provides thefunctionality toprocess allvalues associatedwith an intermediatekeyand generate afinal KVP asoutput.
IRecordWriter<K,V> Interface
Provides t
he
f
unctio
nality
to
write
the
out
put
KVPs
g
e
n
erated
by
t
he
IR
e
duc
er
i
m
p
l
e
m
e
nt
ati
on
.
If the output
is large
a
nd
/ or n
ee
ds to
be
p
er
si
st
e
d,
t
h
en
o
ne
of
t
he
Az
ure
st
or
age
se
r
vi
c
es
or
SQL Az
ure
may
be
u
se
d. If the output issmall andneeds tobesent to themasterfor afinalmerge then it canbewritten tomemory.
Context, MapContext & ReduceContext Classes
The context class providesadditional information to various components during the configuration stage. The Context class is specialized throughMapContext andReduceContext classes which inherit from it and provide additional context only relevant in those two components.
The Library Namespace
R
e
searc
h.M
ap
R
e
duc
e.Li
brar
y provides a set of commonly used functionality for data partitioning, key partitioning and output writing.
Type integrity
Must be maintained in the configuration across the various components.
public interface IDataPartitioner<K, V>
public interface IRecordReader<K, V>
public interface IMapper<KIN, VIN, KOUT, VOUT> where KOUT : IComparable<KOUT>
public interface IReducer<KIN, VIN, KOUT, VOUT> where KIN : IComparable<KIN>
public interface IRecordWriter<K, V>
IEnumerable<IReduceResult<K, V>>GetReduceOutputs<K, V>()
|
Walkthrough – A Word Count MapReduce Implementation
Controller
[ControllerAttribute(
Name = "Word Count",
Description = "Counts #unique words found in input data.")]
public sealed class WordCountController : Controller
{
public override void Run()
{
string outputContainerName
= "word-count-output" + Guid.NewGuid().ToString("N");
var jobConf = new JobConfiguration
{
MapperType = typeof(WordCountMapper),
CombinerType = typeof(WordCountCombiner),
ReducerType = typeof(WordCountReducer),
MapOutputStorage = MapOutputStoreType.Local,
KeyPartitioner = typeof(HashModuloKeyPartitioner<string, int>),
ExceptionPolicy = new TerminateOnFirstException(),
JobTimeout = TimeSpan.FromMinutes(10)
};
var job = new Job(jobConf, this)
{
DataPartitioner = new BlobContainerTextPartitioner(
this.CloudClient, "word-count-input"),
RecordWriter = new BlobTextCsvWriter(
this.CloudClient, outputContainerName),
NoOfReduceTasks = 2
};
// Run the job:
job.Run();
this.Results.Add("OutputContainer", outputContainerName);
}
}
|
Data Partitioner
for splitting an input container that contains blobs having text data in CSV format.
[Serializable]
public class BlobContainerTextPartitioner : IDataPartitioner<int, string>
{
public string ContainerName { get; private set; }
public CloudClient CloudClient { get; private set; }
public BlobContainerTextPartitioner(
CloudClient cloudClient, string containerName)
{
if (cloudClient == null)
{
throw new ArgumentNullException("cloudClient");
}
if (string.IsNullOrEmpty(containerName)
|| string.IsNullOrWhiteSpace(containerName))
{
throw new ArgumentNullException("containerName");
}
this.CloudClient = cloudClient;
this.ContainerName = containerName;
}
public IEnumerable<IDataPartition> GetPartitions()
{
// Return one BlobPartition per blob in the container.
CloudBlobContainer container
= CloudClient.BlobClient.GetContainerReference(this.ContainerName);
foreach (IListBlobItem blobItem in container.ListBlobs())
{
CloudBlob blob = container.GetBlobReference(blobItem.Uri.AbsoluteUri);
blob.FetchAttributes();
yield return new BlobTextPartition(blob, 0, blob.Properties.Length, true);
}
}
public IRecordReader<int, string> GetRecordReader(IDataPartition partition)
{
return new BlobTextReader(this.CloudClient, partition as BlobTextPartition);
}
}
|
Mapper
public sealed class WordCountMapper : IMapper<int, string, string, int>
{
public IEnumerable<KeyValuePair<string, int>> Map(
int key, string value, MapContext<int, string> context)
{
foreach (string word in Regex
.Split(value, "[^a-zA-Z0-9]", RegexOptions.Singleline)
.Where(tuple => !string.IsNullOrEmpty(tuple)))
{
yield return new KeyValuePair<string, int>(word, 1);
}
}
public void Configure(MapContext<int, string> context)
{
throw new System.NotImplementedException();
}
}
|
Reducer
public sealed class WordCountReducer : IReducer<string, int, string, string>
{
public IEnumerable<KeyValuePair<string, string>> Reduce(
string key,
IEnumerable<int> values,
ReduceContext<string, int> context)
{
return new KeyValuePair<string, string>[]
{
new KeyValuePair<string, string>(key, values.Sum().ToString())
};
}
public void Configure(ReduceContext<string, int> context)
{
throw new System.NotImplementedException();
}
}
|
RecordWriter
Write output in CSV format onto an Azure blob. If the output size is less than or equal to 4MB, then it is also stored inside a buffer which is sent back to the controller.
[Serializable]
public class BlobTextCsvWriter : IRecordWriter<string, string>
{
[NonSerialized]
private const int BufferSize = 4 * 1024 * 1024; // 4MB
[NonSerialized]
private bool bufferFull;
[NonSerialized]
protected CloudBlob blob;
[NonSerialized]
private byte[] buffer;
[NonSerialized]
private int noOfBytesWrittenInBuffer;
public CloudClient CloudClient { get; private set; }
public string ContainerName { get; private set; }
public string DirectoryName { get; private set; }
// CTOR
public BlobTextCsvWriter(
CloudClient cloudClient, string containerName, string directoryName = null)
{
if (cloudClient == null)
{
throw new ArgumentNullException("cloudClient");
}
if (string.IsNullOrEmpty(containerName)
|| string.IsNullOrWhiteSpace(containerName))
{
throw new ArgumentNullException("containerName");
}
this.CloudClient = cloudClient;
this.ContainerName = containerName;
this.DirectoryName = directoryName;
}
public virtual void Write(
string outputPartition, IEnumerable<KeyValuePair<string, string>> records)
{
CloudBlobContainer container
= this.CloudClient.BlobClient.GetContainerReference(this.ContainerName);
container.CreateIfNotExist();
this.blob = (!string.IsNullOrEmpty(DirectoryName))
? container
.GetDirectoryReference(DirectoryName)
.GetBlobReference(outputPartition)
: container.GetBlobReference(outputPartition);
buffer = new byte[BufferSize];
IEnumerator<KeyValuePair<string, string>> enumerator
= records.GetEnumerator();
WriteToBuffer(enumerator, buffer);
using (Stream stream = blob.OpenWrite())
{
stream.Write(buffer, 0, noOfBytesWrittenInBuffer);
if (bufferFull)
{
using (StreamWriter sw = new StreamWriter(stream))
{
// Re-write the current record as the buffer is full.
WriteRecord(sw, enumerator.Current);
while (enumerator.MoveNext())
{
WriteRecord(sw, enumerator.Current);
}
}
}
}
}
public IReduceResult<string, string> GetResult()
{
if (bufferFull)
{
return new BlobTextResult(this.CloudClient, blob.Uri.AbsoluteUri);
}
else
{
byte[] localBuffer = new byte[noOfBytesWrittenInBuffer];
Buffer.BlockCopy(buffer, 0, localBuffer, 0, noOfBytesWrittenInBuffer);
return new BlobTextResult(
this.CloudClient, blob.Uri.AbsoluteUri, localBuffer);
}
}
private void WriteToBuffer(
IEnumerator<KeyValuePair<string, string>> enumerator, byte[] buffer)
{
try
{
using (MemoryStream ms = new MemoryStream(buffer))
{
using (StreamWriter sw = new StreamWriter(ms))
{
while (enumerator.MoveNext())
{
WriteRecord(sw, enumerator.Current);
sw.Flush();
noOfBytesWrittenInBuffer = (int)ms.Position;
}
}
}
}
catch (NotSupportedException)
{
bufferFull = true;
}
}
private void WriteRecord(StreamWriter sw, KeyValuePair<string, string> record)
{
sw.Write(record.Key);
sw.Write(",");
sw.Write(record.Value);
sw.WriteLine();
}
}
|
Deployment
Create an Azure service & storage account
- using windows azure developer portal:
- Create a hosted Azure service onto which the Daytona service will be deployed.
- Create an Azure storage account used by the Daytona service to store information related to applications such as inputs, outputs, results etc.
Update Servi
ceC
o
nfig
ura
ti
o
n.
cs
c
fg
with information regarding Master & Slave roles
-
Ma
s
ter
r
o
le - is responsible for picking up new applications from the storage, handling communication with all the slaves, assigning Map and Reduce tasks to available slaves, monitoring task execution etc.
-
Slave
r
ole - is responsible for executing assigned map and reduce tasks, handling communication with master as well as other slaves, reporting master about the task execution etc.
-
Instances - instancecount ofthe Slaverole asperthe anticipated loadandthe number of cores allocated to yourazureproject. The number of instancesfor Master role must be 1.
-
Dia
g
nos
ti
cCon
nec
tio
nS
t
ring - azure storage account connection string which will be used forlogging by the workerroles.
-
S
to
ra
geCo
nnec
tionStri
ng - azure storage account connection string which will be used for storing the input and output data.
-
Map
TaskSlot
Si
ze - The maximum number of map tasks that can be executed at a slave in parallel.
-
Red
uce
TaskSlot
S
ize - The maximum number of reduce tasks that can be executed at a slave in parallel.
<?xml version="1.0" encoding="utf-8"?>
<ServiceConfiguration
serviceName="Research.MapReduce.CloudHost"
xmlns=
"http://schemas.microsoft.com/ServiceHosting/2008/10/ServiceConf
iguration"
osFamily="1"
osVersion="*">
<Role name="Research.MapReduce.CloudHost.Master">
<!--Number of master instances must always be kept as 1-->
<Instances count="1" />
<ConfigurationSettings>
<Setting
name="DiagnosticsConnectionString"
value="
DefaultEndpointsProtocol=https;
AccountName=XXXXXXXX;
AccountKey= XXXXXXXXXX"/>
<Setting
name="StorageConnectionString"
value="
DefaultEndpointsProtocol=https;
AccountName=XXXXXXXX;
AccountKey= XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX "/>
</ConfigurationSettings>
</Role>
<Role name="Research.MapReduce.CloudHost.Slave">
<Instances count="2" />
<ConfigurationSettings>
<Setting
name="DiagnosticsConnectionString"
value="
DefaultEndpointsProtocol=https;
AccountName=XXXXXXXX;
AccountKey=XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX "/>
<Setting
name="StorageConnectionString"
value="
DefaultEndpointsProtocol=https;
AccountName=XXXXXXXX;
AccountKey=XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"/>
<Setting name="MapTaskSlotSize" value="4" />
<Setting name="ReduceTaskSlotSize" value="1" />
</ConfigurationSettings>
</Role>
</ServiceConfiguration>
|
Update Servi
ce
De
f
i
n
i
tio
n.
cs
d
e
f
with information regarding Role element VMSize and LocalStorage
Publish & Deploy the Service
- 2 options to deploy a Windows Azure cloud service: 1) Using Azure management portal. 2) Using Visual Studio IDE
-
If deploying
usi
ng
A
z
ure
ma
nage
m
e
nt
po
r
tal - Daytona provides a default“Medium” VM sizeprecompiled package.
Running the Application
U
p
lo
ad
I
n
put - Daytona Map-Reduce expects inputdatato come from one of the azurestorageservices or SQL Azure. Input can be uploaded to azure storage service using tools such as cerebrata, cloudberry etc. SQL Management studio can be used to upload data to SQL Azure.
Submit - All the assemblies and references are bundled into a single package. To submit the WordCount application from a console app:
var masterConnectionString =
"DefaultEndpointsProtocol=http;
AccountName=<AccountName>;
AccountKey=<AccountKey>";
var client = new MapReduceClient(
"word-count-" + Guid.NewGuid(),
typeof(WordCountController).AssemblyQualifiedName);
client.Submit(masterConnectionString, false);
|
A
bort - Once the application has been submitted to the Daytona runtime, it is possible to abort the already submitted app. Once the abort is requested, runtime will kill the master job and send abort request to all slaves.
client.RequestAbort(masterConnectionString);
|
Tracking and Result
bool concluded = client.WaitForCompletion(
TimeSpan.FromMinutes(10), TimeSpan.FromSeconds(5));
if (concluded && client.Succeeded)
{
foreach (KeyValuePair<string, string> result in client.Results)
{
Console.WriteLine("{0} \t: {1}", result.Key, result.Value);
}
}
else
{
Console.WriteLine(client.FailReason);
}
|
The MapReduce client tool
The MapReduce client tool (mrclient.exe) is a CLU that targets the Daytona MapReduce framework to perform common operations like application submission, monitoring etc. on MapReduce from command-line.Syntax:
mrclient [-cs <connection string>] –c <command> [<switch>...] [-h] [-?]
|
Commands:
-
createp
kg - creates a new application package in the Azure storage.
-
deletep
kg - deletes an application package from the Azure storage.
-
submitapp - submits a MapReduce application for execution.
-
listapps - lists applications submitted to the service which pass the filtering criteria specified by the switches.
-
getapp - gets the details of an application.
-
abortapp - requests the service to abort a running application.
-
deleteapp - deletes application information from the Azure storage.
Conclusion
We've had a look at the workings of Daytona - an iterative MapReduce runtime for Windows Azure.
Remember, This is Microsoft's implementation of Hadoop; the competing distributed computing infrastructure + API of Dryad + LINQ to HPC is now redundant.
In this blog post, weve had a look at the development steps, key properties, architecture, terminology, operational steps, and API of Daytona. We've also done a walkthrough of implementing, deploying and running a Word Count MapReduce app for Azure.
Good luck using Daytona to scale out computations for analysis of distributed data and construct distributed computing cloud data-analysis services on Windows Azure!