Distributed Task Runner
Manager
startup.cs
using ManagerNode;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Quartz;
using Quartz.Impl;
using Quartz.Spi;
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddControllers();
services.AddSingleton<JobSchedulerService>();
services.AddSingleton<RabbitMqJobPublisher>();
// Register Quartz services
services.AddSingleton<IJobFactory, SingletonJobFactory>();
services.AddSingleton<ISchedulerFactory, StdSchedulerFactory>();
// Register the job itself
services.AddSingleton<DistributedQuartzJob>();
// Register the Quartz scheduler with dependencies
services.AddHostedService<QuartzHostedService>();
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
app.UseRouting();
app.UseEndpoints(endpoints =>
{
endpoints.MapControllers();
});
}
}
public class SingletonJobFactory : IJobFactory
{
private readonly IServiceProvider _serviceProvider;
public SingletonJobFactory(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}
public IJob NewJob(TriggerFiredBundle bundle, IScheduler scheduler)
{
return _serviceProvider.GetService(bundle.JobDetail.JobType) as IJob;
}
public void ReturnJob(IJob job) { }
}
public class QuartzHostedService : IHostedService
{
private readonly ISchedulerFactory _schedulerFactory;
private readonly IJobFactory _jobFactory;
private readonly JobSchedulerService _jobSchedulerService;
private IScheduler _scheduler;
public QuartzHostedService(ISchedulerFactory schedulerFactory, IJobFactory jobFactory, JobSchedulerService jobSchedulerService)
{
_schedulerFactory = schedulerFactory;
_jobFactory = jobFactory;
_jobSchedulerService = jobSchedulerService;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
_scheduler = await _schedulerFactory.GetScheduler(cancellationToken);
_scheduler.JobFactory = _jobFactory;
// If you need to start scheduling jobs on startup, do it here
await _scheduler.Start(cancellationToken);
}
public async Task StopAsync(CancellationToken cancellationToken)
{
if (_scheduler != null)
{
await _scheduler.Shutdown(cancellationToken);
}
}
}
program.cs
public class Program
{
public static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
}
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureWebHostDefaults(webBuilder =>
{
webBuilder.UseStartup<Startup>();
});
}
JobSchedulerService.cs
using ManagerNode;
using Quartz;
using Quartz.Impl;
using RabbitMQ.Client;
using System.Text;
public class JobSchedulerService
{
private readonly IScheduler _scheduler;
public JobSchedulerService(ISchedulerFactory schedulerFactory)
{
_scheduler = schedulerFactory.GetScheduler().Result;
}
public async Task ScheduleJob(string jobName, string jobGroup, string triggerName, string triggerGroup, DateTimeOffset startTime)
{
var job = JobBuilder.Create<DistributedQuartzJob>()
.WithIdentity(jobName, jobGroup)
.Build();
var trigger = TriggerBuilder.Create()
.WithIdentity(triggerName, triggerGroup)
.StartAt(startTime)
.Build();
await _scheduler.ScheduleJob(job, trigger);
}
}
public class DistributedQuartzJob : IJob
{
private readonly RabbitMqJobPublisher _rabbitMqJobPublisher;
public DistributedQuartzJob(RabbitMqJobPublisher rabbitMqJobPublisher)
{
_rabbitMqJobPublisher = rabbitMqJobPublisher;
}
public Task Execute(IJobExecutionContext context)
{
// Job execution logic
var message = "DistributedJob executed at " + DateTime.Now;
Console.WriteLine(message);
// Add job details to RabbitMQ queue
_rabbitMqJobPublisher.PublishJob(message);
return Task.CompletedTask;
}
}
RabbitMqJobPublisher.cs
using Newtonsoft.Json;
using RabbitMQ.Client;
using System.Text;
using DistributedScheduler.Jobs;
namespace ManagerNode
{
public class TypedObject
{
public required string Type { get; set; }
public required object Data { get; set; }
}
public class RabbitMqJobPublisher
{
private readonly IConnection _connection;
private readonly IModel _channel;
public RabbitMqJobPublisher()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.QueueDeclare(queue: "job_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
}
public void PublishJob(IDistributedJob job)
{
var message = JsonConvert.SerializeObject(job, Formatting.Indented);
var body = Encoding.UTF8.GetBytes(message);
_channel.BasicPublish(exchange: "",
routingKey: "job_queue",
basicProperties: null,
body: body);
}
public void PublishJob(string jobString)
{
var message = jobString;
var body = Encoding.UTF8.GetBytes(message);
_channel.BasicPublish(exchange: "",
routingKey: "job_queue",
basicProperties: null,
body: body);
}
}
}
JobController.cs
Runner
startup.cs
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddControllers();
services.AddSingleton<JobConsumerService>(); // Register JobConsumerService
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
app.UseRouting();
app.UseEndpoints(endpoints =>
{
endpoints.MapControllers();
});
// Ensure JobConsumerService is instantiated when the application starts
var jobConsumerService = app.ApplicationServices.GetService<JobConsumerService>();
}
}
program.cs
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Hosting;
public class Program
{
public static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
}
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureWebHostDefaults(webBuilder =>
{
webBuilder.UseStartup<Startup>();
});
}
using Microsoft.AspNetCore.Mvc;
namespace RunnerNode.Controllers
{
[ApiController]
[Route("api/runner")]
public class RunnerController : ControllerBase
{
[HttpGet("health")]
public IActionResult GetHealth()
{
// Implement health check logic
return Ok("Runner Node is healthy");
}
[HttpGet("status")]
public IActionResult GetStatus()
{
// Implement status retrieval logic
return Ok("Status: Processing jobs");
}
}
}
No comments:
Post a Comment