Distributed Task Runner
Manager
startup.cs
using ManagerNode;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Quartz;
using Quartz.Impl;
using Quartz.Spi;
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddControllers();
services.AddSingleton<JobSchedulerService>();
services.AddSingleton<RabbitMqJobPublisher>();
// Register Quartz services
services.AddSingleton<IJobFactory, SingletonJobFactory>();
services.AddSingleton<ISchedulerFactory, StdSchedulerFactory>();
// Register the job itself
services.AddSingleton<DistributedQuartzJob>();
// Register the Quartz scheduler with dependencies
services.AddHostedService<QuartzHostedService>();
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
app.UseRouting();
app.UseEndpoints(endpoints =>
{
endpoints.MapControllers();
});
}
}
public class SingletonJobFactory : IJobFactory
{
private readonly IServiceProvider _serviceProvider;
public SingletonJobFactory(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}
public IJob NewJob(TriggerFiredBundle bundle, IScheduler scheduler)
{
return _serviceProvider.GetService(bundle.JobDetail.JobType) as IJob;
}
public void ReturnJob(IJob job) { }
}
public class QuartzHostedService : IHostedService
{
private readonly ISchedulerFactory _schedulerFactory;
private readonly IJobFactory _jobFactory;
private readonly JobSchedulerService _jobSchedulerService;
private IScheduler _scheduler;
public QuartzHostedService(ISchedulerFactory schedulerFactory, IJobFactory jobFactory, JobSchedulerService jobSchedulerService)
{
_schedulerFactory = schedulerFactory;
_jobFactory = jobFactory;
_jobSchedulerService = jobSchedulerService;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
_scheduler = await _schedulerFactory.GetScheduler(cancellationToken);
_scheduler.JobFactory = _jobFactory;
// If you need to start scheduling jobs on startup, do it here
await _scheduler.Start(cancellationToken);
}
public async Task StopAsync(CancellationToken cancellationToken)
{
if (_scheduler != null)
{
await _scheduler.Shutdown(cancellationToken);
}
}
}
program.cs
public class Program
{
public static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
}
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureWebHostDefaults(webBuilder =>
{
webBuilder.UseStartup<Startup>();
});
}
JobSchedulerService.cs
using ManagerNode;
using Quartz;
using Quartz.Impl;
using RabbitMQ.Client;
using System.Text;
public class JobSchedulerService
{
private readonly IScheduler _scheduler;
public JobSchedulerService(ISchedulerFactory schedulerFactory)
{
_scheduler = schedulerFactory.GetScheduler().Result;
}
public async Task ScheduleJob(string jobName, string jobGroup, string triggerName, string triggerGroup, DateTimeOffset startTime)
{
var job = JobBuilder.Create<DistributedQuartzJob>()
.WithIdentity(jobName, jobGroup)
.Build();
var trigger = TriggerBuilder.Create()
.WithIdentity(triggerName, triggerGroup)
.StartAt(startTime)
.Build();
await _scheduler.ScheduleJob(job, trigger);
}
}
public class DistributedQuartzJob : IJob
{
private readonly RabbitMqJobPublisher _rabbitMqJobPublisher;
public DistributedQuartzJob(RabbitMqJobPublisher rabbitMqJobPublisher)
{
_rabbitMqJobPublisher = rabbitMqJobPublisher;
}
public Task Execute(IJobExecutionContext context)
{
// Job execution logic
var message = "DistributedJob executed at " + DateTime.Now;
Console.WriteLine(message);
// Add job details to RabbitMQ queue
_rabbitMqJobPublisher.PublishJob(message);
return Task.CompletedTask;
}
}
RabbitMqJobPublisher.cs
using Newtonsoft.Json;
using RabbitMQ.Client;
using System.Text;
using DistributedScheduler.Jobs;
namespace ManagerNode
{
public class TypedObject
{
public required string Type { get; set; }
public required object Data { get; set; }
}
public class RabbitMqJobPublisher
{
private readonly IConnection _connection;
private readonly IModel _channel;
public RabbitMqJobPublisher()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.QueueDeclare(queue: "job_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
}
public void PublishJob(IDistributedJob job)
{
var message = JsonConvert.SerializeObject(job, Formatting.Indented);
var body = Encoding.UTF8.GetBytes(message);
_channel.BasicPublish(exchange: "",
routingKey: "job_queue",
basicProperties: null,
body: body);
}
public void PublishJob(string jobString)
{
var message = jobString;
var body = Encoding.UTF8.GetBytes(message);
_channel.BasicPublish(exchange: "",
routingKey: "job_queue",
basicProperties: null,
body: body);
}
}
}
JobController.cs
using DistributedScheduler.Jobs;
using Microsoft.AspNetCore.Mvc;
namespace ManagerNode.Controllers
{
[ApiController]
[Route("api/manager/job")]
public class JobController : ControllerBase
{
private readonly JobSchedulerService _jobSchedulerService;
public JobController(JobSchedulerService jobSchedulerService)
{
_jobSchedulerService = jobSchedulerService;
}
[HttpPost]
public async Task<IActionResult> SubmitJob([FromBody] DistributedJob job)
{
if (_jobSchedulerService is null)
throw new ArgumentNullException(nameof(_jobSchedulerService));
await _jobSchedulerService.ScheduleJob(job.Name, job.Name, job.Name, job.Name, new DateTimeOffset(DateTime.Now.AddMinutes(1)));
return Ok(new { job.Id, Status = "Job submitted and published to queue successfully" });
}
}
}
ManagerController.cs
using Microsoft.AspNetCore.Mvc;
[ApiController]
[Route("api/manager")]
public class ManagerController : ControllerBase
{
[HttpGet("health")]
public IActionResult Health()
{
var healthStatus = new
{
Status = "Healthy",
PendingTasks = 5, // Example value
ConnectedRunners = 10 // Example value
};
return Ok(healthStatus);
}
[HttpGet("status")]
public IActionResult Status()
{
// Return detailed status, possibly with more complex data
return Ok("Manager is operational with all systems functioning.");
}
}
Runner
startup.cs
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddControllers();
services.AddSingleton<JobConsumerService>(); // Register JobConsumerService
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
app.UseRouting();
app.UseEndpoints(endpoints =>
{
endpoints.MapControllers();
});
// Ensure JobConsumerService is instantiated when the application starts
var jobConsumerService = app.ApplicationServices.GetService<JobConsumerService>();
}
}
program.cs
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Hosting;
public class Program
{
public static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
}
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureWebHostDefaults(webBuilder =>
{
webBuilder.UseStartup<Startup>();
});
}
RunnerController.cs
using Microsoft.AspNetCore.Mvc;
namespace RunnerNode.Controllers
{
[ApiController]
[Route("api/runner")]
public class RunnerController : ControllerBase
{
[HttpGet("health")]
public IActionResult GetHealth()
{
// Implement health check logic
return Ok("Runner Node is healthy");
}
[HttpGet("status")]
public IActionResult GetStatus()
{
// Implement status retrieval logic
return Ok("Status: Processing jobs");
}
}
}
JobConsumeService.cs
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
public class JobConsumerService
{
private readonly IConnection _connection;
private readonly IModel _channel;
public JobConsumerService()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.QueueDeclare(queue: "job_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
ProcessJob(message);
};
_channel.BasicConsume(queue: "job_queue",
autoAck: true, // Auto-acknowledge messages
consumer: consumer);
}
private void ProcessJob(string jobData)
{
try
{
Console.WriteLine($"Processing job: {jobData}");
// Implement your job processing logic here
Console.WriteLine($"Job processed successfully: {jobData}. Completed on {DateTime.Now}");
}
catch (Exception ex)
{
Console.WriteLine($"Error processing job: {ex.Message}");
// Handle exception (e.g., retry logic)
}
}
}
Distributed Jobs Class Lib
DistributedJob.cs
public class DistributedJob : IDistributedJob
{
public required string Id { get; set; }
public required string Name { get; set; }
public required string Description { get; set; }
public required string TypeName { get; set; }
public required string Status { get; set; }
public required string CronSchedule { get; set; }
public DateTime? CreatedAt { get; set; }
}
public interface IDistributedJob
{
string Id { get; set; }
string Name { get; set; }
string Description { get; set; }
string TypeName { get; set; }
string Status { get; set; }
string CronSchedule { get; set; }
DateTime? CreatedAt { get; set; }
}
Run Rabbit MQ with docker. It is easier than installing in local machine
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
Postman Collection
{
"info": {
"_postman_id": "108dded1-e54e-4999-974a-4bb8353b8da1",
"name": "Distributed Scheduler",
"schema": "https://schema.getpostman.com/json/collection/v2.0.0/collection.json",
"_exporter_id": "15322780"
},
"item": [
{
"name": "Manager",
"item": [
{
"name": "Job",
"item": [
{
"name": "New Request",
"request": {
"method": "POST",
"header": [],
"body": {
"mode": "raw",
"raw": "{\r\n \"Id\": \"acfd5c51-fc47-4f26-8fb1-0c5253ef9e65\",\r\n \"Name\": \"Job 1\",\r\n \"Description\": \"Job One\",\r\n \"status\": \"pending\",\r\n \"TypeName\": \"DistributedJob, DistributedScheduler.Jobs\",\r\n \"CronSchedule\": \"* * * * \"\r\n}",
"options": {
"raw": {
"language": "json"
}
}
},
"url": "http://localhost:5082/api/manager/job"
},
"response": []
}
]
},
{
"name": "Health",
"request": {
"method": "GET",
"header": [],
"url": "http://localhost:5082/api/manager/health"
},
"response": []
},
{
"name": "Status",
"request": {
"method": "GET",
"header": [],
"url": "http://localhost:5082/api/manager/health"
},
"response": []
}
]
},
{
"name": "Runner",
"item": [
{
"name": "Health",
"request": {
"method": "GET",
"header": [],
"url": "http://localhost:5028/api/runner/health"
},
"response": []
},
{
"name": "Status",
"request": {
"method": "GET",
"header": [],
"url": "http://localhost:5028/api/runner/health"
},
"response": []
}
]
}
]
}