im com
This commit is contained in:
parent
aa5af48111
commit
97f28975d5
1 changed files with 174 additions and 115 deletions
|
@ -127,39 +127,64 @@ namespace
|
||||||
return MkFPos<PosT>::mk(pos, map);
|
return MkFPos<PosT>::mk(pos, map);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template <class TarIndex, class SrcIndex, class F>
|
||||||
|
static void setupMap(const Sptr<TarIndex>& ti, const Sptr<SrcIndex>& si,
|
||||||
|
const F& f, const Sptr<Vector<SizeT>>& m)
|
||||||
|
{
|
||||||
|
auto six = *si;
|
||||||
|
auto sie = si->range()->end();
|
||||||
|
auto tix = *ti;
|
||||||
|
for(six = 0; six != sie; ++six){
|
||||||
|
tix.at( f(*six) );
|
||||||
|
if(six.rank() == getRankNumber()){
|
||||||
|
(*m)[six.local()->lex()] = tix.pos();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
template <class TarIndex, class SrcIndex, class F>
|
||||||
|
static Sptr<Vector<SizeT>> setupMap(const Sptr<TarIndex>& ti, const Sptr<SrcIndex>& si,
|
||||||
|
const F& f)
|
||||||
|
{
|
||||||
|
auto o = std::make_shared<Vector<SizeT>>(si->local()->lmax().val());
|
||||||
|
setupMap(ti,si,f,o);
|
||||||
|
return o;
|
||||||
|
}
|
||||||
|
|
||||||
template <class TarIndex, class SrcIndex, class Xpr>
|
template <class TarIndex, class SrcIndex, class Xpr>
|
||||||
class MapXpr : public XprInterface<MapXpr<TarIndex,SrcIndex,Xpr>>
|
class MapXpr : public XprInterface<MapXpr<TarIndex,SrcIndex,Xpr>>
|
||||||
{
|
{
|
||||||
private:
|
private:
|
||||||
Sptr<TarIndex> mTi;
|
Sptr<TarIndex> mTi;
|
||||||
Sptr<SrcIndex> mSi;
|
Sptr<SrcIndex> mSi;
|
||||||
Vector<SizeT> mMap;
|
Sptr<Vector<SizeT>> mMap;
|
||||||
Xpr mXpr;
|
Xpr mXpr;
|
||||||
typedef decltype(mkFPos( mXpr.rootSteps(mTi->id()), mMap.data() )) Ext;
|
typedef decltype(mkFPos( mXpr.rootSteps(mTi->id()), mMap->data() )) Ext;
|
||||||
Ext mExt;
|
Ext mExt;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
|
||||||
MapXpr() = default;
|
MapXpr() = default;
|
||||||
|
|
||||||
// src local
|
// src local
|
||||||
template <class F>
|
template <class F>
|
||||||
MapXpr(const Sptr<TarIndex>& ti, const Sptr<SrcIndex>& si, const F& f, Xpr&& xpr) :
|
MapXpr(const Sptr<TarIndex>& ti, const Sptr<SrcIndex>& si, const F& f, Xpr&& xpr) :
|
||||||
mTi(ti), mSi(si), mMap(mSi->local()->lmax().val()), mXpr(std::forward<Xpr>(xpr)),
|
mTi(ti), mSi(si),
|
||||||
mExt(mkFPos( mXpr.rootSteps(mTi->id()), mMap.data() ))
|
mMap(std::make_shared<Vector<SizeT>>(mSi->local()->lmax().val())),
|
||||||
|
mXpr(std::forward<Xpr>(xpr)),
|
||||||
|
mExt(mkFPos( mXpr.rootSteps(mTi->id()), mMap->data() ))
|
||||||
{
|
{
|
||||||
//VCHECK(mTi->id().id());
|
setupMap(ti,si,f,mMap);
|
||||||
auto six = *si;
|
|
||||||
auto sie = si->range()->end();
|
|
||||||
auto tix = *ti;
|
|
||||||
for(six = 0; six != sie; ++six){
|
|
||||||
tix.at( f(*six) );
|
|
||||||
if(six.rank() == getRankNumber()){
|
|
||||||
mMap[six.local()->lex()] = tix.pos();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
MapXpr(const Sptr<TarIndex>& ti, const Sptr<SrcIndex>& si,
|
||||||
|
const Sptr<Vector<SizeT>>& m, Xpr&& xpr) :
|
||||||
|
mTi(ti), mSi(si),
|
||||||
|
mMap(m), mXpr(std::forward<Xpr>(xpr)),
|
||||||
|
mExt(mkFPos( mXpr.rootSteps(mTi->id()), mMap->data() ))
|
||||||
|
{}
|
||||||
|
|
||||||
|
|
||||||
template <class PosT>
|
template <class PosT>
|
||||||
decltype(auto) operator()(const PosT& last) const
|
decltype(auto) operator()(const PosT& last) const
|
||||||
{
|
{
|
||||||
|
@ -183,12 +208,114 @@ namespace
|
||||||
{
|
{
|
||||||
return MapXpr<TarIndex,SrcIndex,Xpr>(ti,si,f,std::forward<Xpr>(xpr));
|
return MapXpr<TarIndex,SrcIndex,Xpr>(ti,si,f,std::forward<Xpr>(xpr));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template <class TarIndex, class SrcIndex, class Xpr>
|
||||||
|
decltype(auto) mapXpr(const Sptr<TarIndex>& ti, const Sptr<SrcIndex>& si,
|
||||||
|
const Sptr<Vector<SizeT>>& m, Xpr&& xpr)
|
||||||
|
{
|
||||||
|
return MapXpr<TarIndex,SrcIndex,Xpr>(ti,si,m,std::forward<Xpr>(xpr));
|
||||||
|
}
|
||||||
|
|
||||||
|
template <class TarIndex, class SrcIndex, typename T>
|
||||||
|
void setupBuffer(const Sptr<TarIndex>& rgj, const Sptr<SrcIndex>& rgi,
|
||||||
|
const Sptr<Vector<SizeT>>& fmap, const Vector<T>& data,
|
||||||
|
Vector<T>& buf, Vector<const T*>& map, const SizeT blocks)
|
||||||
|
{
|
||||||
|
const SizeT myrank = getRankNumber();
|
||||||
|
const SizeT Nranks = getNumRanks();
|
||||||
|
|
||||||
|
const SizeT mapsize = rgj->range()->size();
|
||||||
|
map = Vector<const T*>(mapsize,nullptr);
|
||||||
|
Vector<Vector<T>> sendbuf(Nranks);
|
||||||
|
for(auto& sb: sendbuf){
|
||||||
|
sb.reserve(data.size());
|
||||||
|
}
|
||||||
|
Vector<Vector<SizeT>> request(Nranks);
|
||||||
|
const SizeT locsz = rgi->local()->lmax().val();
|
||||||
|
|
||||||
|
// First loop: setup send buffer
|
||||||
|
rgi->ifor( mapXpr(rgj, rgi, fmap,
|
||||||
|
operation
|
||||||
|
( [&](SizeT p, SizeT q) {
|
||||||
|
const SizeT r = p / locsz;
|
||||||
|
if(myrank != r){
|
||||||
|
request[r].push_back(p % locsz);
|
||||||
|
}
|
||||||
|
} , posop(rgj), posop(rgi) ) ) ,
|
||||||
|
NoF {} )();
|
||||||
|
|
||||||
|
// transfer:
|
||||||
|
Vector<SizeT> reqsizes(Nranks);
|
||||||
|
SizeT bufsize = 0;
|
||||||
|
Vector<Vector<SizeT>> ext(Nranks);
|
||||||
|
for(auto& e: ext){
|
||||||
|
e.resize(Nranks);
|
||||||
|
}
|
||||||
|
for(SizeT i = 0; i != Nranks; ++i){
|
||||||
|
reqsizes[i] = request[i].size();
|
||||||
|
bufsize += reqsizes[i]*blocks;
|
||||||
|
ext[myrank][i] = reqsizes[i];
|
||||||
|
}
|
||||||
|
buf.resize(bufsize);
|
||||||
|
MPI_Status stat;
|
||||||
|
|
||||||
|
// transfer requests:
|
||||||
|
for(SizeT o = 1; o != Nranks; ++o){
|
||||||
|
const SizeT dstr = (myrank + o) % Nranks;
|
||||||
|
const SizeT srcr = (myrank - o + Nranks) % Nranks;
|
||||||
|
SizeT sendsize = 0;
|
||||||
|
MPI_Sendrecv(reqsizes.data()+dstr, 1, MPI_UNSIGNED_LONG, dstr, 0,
|
||||||
|
&sendsize, 1, MPI_UNSIGNED_LONG, srcr, 0, MPI_COMM_WORLD, &stat);
|
||||||
|
ext[srcr][myrank] = sendsize;
|
||||||
|
Vector<SizeT> sendpos(sendsize);
|
||||||
|
MPI_Sendrecv(request[dstr].data(), reqsizes[dstr], MPI_UNSIGNED_LONG, dstr, 0,
|
||||||
|
sendpos.data(), sendsize, MPI_UNSIGNED_LONG, srcr, 0, MPI_COMM_WORLD, &stat);
|
||||||
|
sendbuf[srcr].resize(sendsize*blocks);
|
||||||
|
for(SizeT i = 0; i != sendsize; ++i){
|
||||||
|
std::memcpy( sendbuf[srcr].data()+i*blocks, data.data()+sendpos[i]*blocks, blocks*sizeof(T) );
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const MPI_Datatype dt = Typemap<T>::value();
|
||||||
|
|
||||||
|
// transfer data:
|
||||||
|
for(SizeT o = 1; o != Nranks; ++o){
|
||||||
|
const SizeT dstr = (myrank + o) % Nranks;
|
||||||
|
const SizeT srcr = (myrank - o + Nranks) % Nranks;
|
||||||
|
SizeT off = 0;
|
||||||
|
for(SizeT p = 0; p != srcr; ++p){
|
||||||
|
off += ext[myrank][p];
|
||||||
|
}
|
||||||
|
|
||||||
|
MPI_Sendrecv(sendbuf[dstr].data(), ext[dstr][myrank]*blocks, dt, dstr, 0,
|
||||||
|
buf.data()+off*blocks, ext[myrank][srcr]*blocks, dt, srcr, 0,
|
||||||
|
MPI_COMM_WORLD, &stat);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// Second loop: Assign map to target buffer positions:
|
||||||
|
Vector<SizeT> cnt(Nranks);
|
||||||
|
rgi->ifor( mapXpr(rgj, rgi, fmap,
|
||||||
|
operation
|
||||||
|
( [&](SizeT p, SizeT q) {
|
||||||
|
const SizeT r = p / locsz;
|
||||||
|
if(myrank != r){
|
||||||
|
SizeT off = 0;
|
||||||
|
for(SizeT s = 0; s != r; ++s){
|
||||||
|
off += ext[myrank][s];
|
||||||
|
}
|
||||||
|
map[p] = buf.data() + off*blocks + cnt[r]*blocks;
|
||||||
|
++cnt[r];
|
||||||
|
}
|
||||||
|
map[q + myrank*locsz] = data.data() + q*blocks;
|
||||||
|
} , posop(rgj), posop(rgi) ) ), NoF {} )();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void run2(const Env& env)
|
void run2(const Env& env)
|
||||||
{
|
{
|
||||||
const SizeT myrank = getRankNumber();
|
const SizeT myrank = getRankNumber();
|
||||||
const SizeT Nranks = getNumRanks();
|
//const SizeT Nranks = getNumRanks();
|
||||||
|
|
||||||
typedef UIndex<Int> UI;
|
typedef UIndex<Int> UI;
|
||||||
typedef MIndex<UI,UI,UI,UI> LocI;
|
typedef MIndex<UI,UI,UI,UI> LocI;
|
||||||
|
@ -207,25 +334,22 @@ void run2(const Env& env)
|
||||||
constexpr auto C3 = CSizeT<3> {};
|
constexpr auto C3 = CSizeT<3> {};
|
||||||
|
|
||||||
const SizeT LSize = env.mRRange->sub(1)->size();
|
const SizeT LSize = env.mRRange->sub(1)->size();
|
||||||
|
|
||||||
const SizeT blocks = env.mSRange->size();
|
const SizeT blocks = env.mSRange->size();
|
||||||
|
|
||||||
Vector<Double> data(LSize*blocks);
|
Vector<Double> data(LSize*blocks);
|
||||||
Vector<Double> buf;
|
|
||||||
Vector<Double*> map(env.mRRange->size(),nullptr);
|
|
||||||
for(SizeT i = 0; i != data.size(); ++i){
|
for(SizeT i = 0; i != data.size(); ++i){
|
||||||
data[i] = static_cast<Double>(LSize*myrank*blocks+i);
|
data[i] = static_cast<Double>(LSize*myrank*blocks+i);
|
||||||
}
|
}
|
||||||
Vector<Vector<Double>> sendbuf(Nranks);
|
|
||||||
for(auto& sb: sendbuf){
|
|
||||||
sb.reserve(data.size());
|
|
||||||
}
|
|
||||||
|
|
||||||
*rgj = 0;
|
*rgj = 0;
|
||||||
while(rgj->rank() != 1){
|
while(rgj->rank() != 1){
|
||||||
++*rgj;
|
++*rgj;
|
||||||
}
|
}
|
||||||
*rgj->local() = 0;
|
*rgj->local() = 0;
|
||||||
|
|
||||||
|
Vector<Double> buf;
|
||||||
|
Vector<const Double*> map(env.mRRange->size(),nullptr);
|
||||||
|
|
||||||
auto shift = [&](const auto& x){
|
auto shift = [&](const auto& x){
|
||||||
auto o = x;
|
auto o = x;
|
||||||
std::get<0>(o) += 1;
|
std::get<0>(o) += 1;
|
||||||
|
@ -237,100 +361,35 @@ void run2(const Env& env)
|
||||||
return o;
|
return o;
|
||||||
};
|
};
|
||||||
|
|
||||||
Vector<Vector<SizeT>> request(Nranks);
|
const Sptr<Vector<SizeT>> fmap = setupMap(rgj, rgi, shift);
|
||||||
SizeT cntx = 0;
|
setupBuffer(rgj, rgi, fmap, data, buf, map, env.mSRange->size());
|
||||||
const SizeT locsz = rgi->local()->lmax().val();
|
|
||||||
// First loop: setup send buffer
|
|
||||||
rgi->ifor( mapXpr(rgj, rgi, shift,
|
|
||||||
operation
|
|
||||||
( [&](SizeT p, SizeT q) {
|
|
||||||
const SizeT r = p / locsz;
|
|
||||||
if(myrank != r){
|
|
||||||
request[r].insert(p % locsz);
|
|
||||||
}
|
|
||||||
/*
|
|
||||||
gj = rgj->lex();
|
|
||||||
*gj.pack()[C0] = (gj.pack()[C0]->lex() + 1) % gj.pack()[C0]->lmax().val();
|
|
||||||
*gj.pack()[C2] = (gj.pack()[C2]->lex() + 1) % gj.pack()[C2]->lmax().val();
|
|
||||||
*gj.pack()[C3] = (gj.pack()[C3]->lex() + 1) % gj.pack()[C3]->lmax().val();
|
|
||||||
gj();
|
|
||||||
*rgk = gj.lex();
|
|
||||||
if(myrank == 1){
|
|
||||||
std::cout << p << " " << rgk->pos() << ", "
|
|
||||||
<< r << " " << rgk->rank() << ", "
|
|
||||||
<< q << " " << rgj->local()->pos() << std::endl;
|
|
||||||
assert(p == rgk->pos());
|
|
||||||
assert(r == rgk->rank());
|
|
||||||
assert(q == rgj->local()->pos());
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
/*
|
|
||||||
++*rgj->local();
|
|
||||||
(*rgj)();
|
|
||||||
++cntx;
|
|
||||||
*/
|
|
||||||
} , posop(rgj), posop(rgi) ) ) ,
|
|
||||||
NoF {} )();
|
|
||||||
|
|
||||||
// transfer:
|
// Third loop: Check:
|
||||||
Vector<SizeT> reqsizes(Nranks);
|
for(*rgi = 0, gi = 0; rgi->lex() != rgi->lmax().val(); ++*rgi, ++gi){
|
||||||
SizeT bufsize = 0;
|
gj = gi.lex();
|
||||||
Vector<Vector<SizeT>> ext(Nranks);
|
*gj.pack()[C0] = (gj.pack()[C0]->lex() + 1) % gj.pack()[C0]->lmax().val();
|
||||||
for(auto& e: ext){
|
*gj.pack()[C2] = (gj.pack()[C2]->lex() + 1) % gj.pack()[C2]->lmax().val();
|
||||||
e.resize(Nranks);
|
*gj.pack()[C3] = (gj.pack()[C3]->lex() + 1) % gj.pack()[C3]->lmax().val();
|
||||||
}
|
gj();
|
||||||
for(SizeT i = 0; i != Nranks; ++i){
|
*rgj = gj.lex();
|
||||||
reqsizes[i] = request[i].size();
|
|
||||||
bufsize += reqsizes[i]*blocks;
|
if(rgi->rank() == myrank){
|
||||||
ext[myrank][i] = reqsize[i];
|
assert(map.data()[rgj->pos()] != nullptr);
|
||||||
}
|
|
||||||
MPI_Status stat;
|
const Double vn = *map[rgj->pos()]/blocks;
|
||||||
// transfer requests:
|
const SizeT xp = static_cast<SizeT>(vn);
|
||||||
for(SizeT o = 1; o != Nranks; ++o){
|
const SizeT orank = xp / env.mRRange->sub(1)->size();
|
||||||
const SizeT dstr = (myrank + o) % Nranks;
|
assert(env.mRRange->sub(1)->size() == 16*12*12*12/4);
|
||||||
const SizeT srcr = (myrank - o + Nranks) % Nranks;
|
if(myrank == 0){
|
||||||
SizeT sendsize = 0;
|
std::cout << " pos = " << rgj->pos() << " , val = " << *map[rgj->pos()]
|
||||||
MPI_Sendrecv(reqsizes.data()+dstr, 1, MPI_ULONG, dstr, 0,
|
<< " , val_norm = " << vn << " , origin rank = "
|
||||||
&sendsize, 1, MPI_ULONG, srcr, 0, MPI_COMM_WORLD, &stat);
|
<< orank << std::endl;
|
||||||
ext[srcr][myrank] = sendsize;
|
}
|
||||||
Vector<SizeT> sendpos(sendsize);
|
assert(orank == rgj->rank());
|
||||||
MPI_Sendrecv(requests[dstr].data(), reqsizes[dstr], MPI_ULONG, dstr, 0,
|
assert(vn == rgj->pos());
|
||||||
sendpos.data(), sendsize, MPI_ULONG, srcr, 0, MPI_COMM_WORLD, &stat);
|
|
||||||
sendbuf[srcr].resize(sendsize*blocks);
|
|
||||||
for(SizeT i = 0; i != sendsize; ++i){
|
|
||||||
std::memcpy( sendbuf[srcr]+i, data.data()+sendpos[i]*blocks, blocks );
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// transfer data:
|
CHECK;
|
||||||
for(SizeT o = 1; o != Nranks; ++o){
|
|
||||||
const SizeT dstr = (myrank + o) % Nranks;
|
|
||||||
const SizeT srcr = (myrank - o + Nranks) % Nranks;
|
|
||||||
SizeT off = 0;
|
|
||||||
for(SizeT p = 0; p != srcr; ++p){
|
|
||||||
off += ext[myrank][p];
|
|
||||||
}
|
|
||||||
MPI_Sendrecv(sendbuf[dstr].data(), ext[dstr][myrank]*blocks, MPI_DOUBLE, dstr, 0,
|
|
||||||
buf.data()+off*blocks, ext[myrank][srcr]*blocks, MPI_DOUBLE, srcr, 0,
|
|
||||||
MPI_COMM_WORLD, &stat);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Second loop: Assign map to target buffer positions:
|
|
||||||
Vector<SizeT> cnt(Nranks);
|
|
||||||
rgi->ifor( mapXpr(rgj, rgi, shift,
|
|
||||||
operation
|
|
||||||
( [&](SizeT p, SizeT q) {
|
|
||||||
const SizeT r = p / locsz;
|
|
||||||
if(myrank != r){
|
|
||||||
SizeT off = 0;
|
|
||||||
for(SizeT s = 0; s != r; ++s){
|
|
||||||
off += ext[myrank][s];
|
|
||||||
}
|
|
||||||
map[p] = buf.data() + off*blocks + cnt[r]*blocks;
|
|
||||||
++cnt[r];
|
|
||||||
}
|
|
||||||
map[q] = data.data() + q*blocks;
|
|
||||||
} , posop(rgj), posop(rgi) ) ) )();
|
|
||||||
|
|
||||||
MPI_Barrier(MPI_COMM_WORLD);
|
MPI_Barrier(MPI_COMM_WORLD);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue